package lmaxapi import "tickserver/api/lmaxapi/request" import "tickserver/api/lmaxapi/response" import "sync" import "reflect" import "time" import "log" import "errors" //会发生一个线程写,一个线程读的情况,暂时先全部加锁 type OrderBook struct { mu sync.Mutex orderevent func(string, *response.OrderEvent) positionevent func(string, *response.PositionEvent) orderbookstatusevent func(string, *response.OrderBookStatusEvent) rejectedevent func(string, *response.InstructionRejectedEvent) rejectedevents map[string]func(string, *response.InstructionRejectedEvent) accountevent func(string, *response.AccountStateEvent) orderbookevent func(*response.OrderBookEvent) events map[string][]func(string, *response.OrderEvent) orderIdToInstId map[string]string mtf *MtfClient } //在账户登出又重新登录的情况下,我们需要做特殊的处理 //价格还是简单的被重写 //accountStatus 也可以简单的被set //但是order 和 position 要进行下面的操作: //增加一个init事件,这个事件是用户的一个快照。0的事件 func NewOrderBook(mtf *Mtf) *OrderBook { //log.Println("NewOrderBook") ob := &OrderBook{} ob.events = make(map[string][]func(string, *response.OrderEvent)) ob.orderIdToInstId = make(map[string]string) ob.rejectedevents = make(map[string]func(string, *response.InstructionRejectedEvent)) ob.mtf = NewMtfClient(mtf, time.Second, false, 1024) go ob.Start() err := ob.mtf.ConnectPrivate() if err != nil { log.Println(err) return nil } err = ob.mtf.ConnectPublic() if err != nil { log.Println(err) return nil } return ob } func (this *OrderBook) Stop() { this.mtf.Close() this.mtf.GetMtf().Close() } //如果在多线程环境下,或者要修改,那么要采用copy模式 func (this *OrderBook) GetOrders(iscopy bool) []*response.OrderEvent { return this.GetOrdersUnsafe(iscopy) } func (this *OrderBook) GetOrdersUnsafe(iscopy bool) []*response.OrderEvent { return this.mtf.GetOrders(iscopy) } //对某个订单添加监听事件 func (this *OrderBook) AddEvent(instructId string, cb func(status string, event *response.OrderEvent)) { this.mu.Lock() defer this.mu.Unlock() //查找相关的事件 instrunctId events, id := this.findEvent(instructId) orders := this.GetOrders(true) if id == "" { //如果没有找到,那么添加事件 id = instructId for i := 0; i < len(orders); i++ { if orders[i].InstructionId == instructId { this.orderIdToInstId[orders[i].OrderId] = instructId break } } } for i := 0; i < len(events); i++ { if reflect.ValueOf(cb).Pointer() == reflect.ValueOf(events[i]).Pointer() { return } } this.events[id] = append(events, cb) } func (this *OrderBook) AddRejectEvent(instructId string, cb func(status string, event *response.InstructionRejectedEvent)) { this.mu.Lock() defer this.mu.Unlock() this.rejectedevents[instructId] = cb } func (this *OrderBook) SetOrderBookEvent(cb func(event *response.OrderBookEvent)) { this.mu.Lock() defer this.mu.Unlock() this.orderbookevent = cb } func (this *OrderBook) ClearEvent(instrunctId string, cb func(status string, event *response.OrderEvent)) { this.mu.Lock() defer this.mu.Unlock() events, id := this.findEvent(instrunctId) if id == "" { //如果没有找到,那么无法删除事件 return } for i := 0; i < len(events); i++ { if reflect.ValueOf(cb).Pointer() == reflect.ValueOf(events[i]).Pointer() { events = append(events[:i], events[i+1:]...) break } } this.events[id] = events } func (this *OrderBook) SetOrderEvent(cb func(status string, event *response.OrderEvent)) { this.mu.Lock() defer this.mu.Unlock() this.orderevent = cb } func (this *OrderBook) SetRejectedEvent(cb func(status string, event *response.InstructionRejectedEvent)) { this.mu.Lock() defer this.mu.Unlock() this.rejectedevent = cb } func (this *OrderBook) SetOrderBookStatusEvent(cb func(status string, event *response.OrderBookStatusEvent)) { this.mu.Lock() defer this.mu.Unlock() this.orderbookstatusevent = cb } func (this *OrderBook) SetPositionEvent(cb func(status string, event *response.PositionEvent)) { this.mu.Lock() defer this.mu.Unlock() this.positionevent = cb } func (this *OrderBook) SetAccountEvent(cb func(status string, event *response.AccountStateEvent)) { this.mu.Lock() defer this.mu.Unlock() this.accountevent = cb } func (this *OrderBook) SetRejected(event *response.InstructionRejectedEvent) { this.mtf.SetRejected(event) if this.rejectedevent != nil { this.rejectedevent("SystemUpdate", event) } if cb, ok := this.rejectedevents[event.InstructionId]; ok { cb("SystemUpdate", event) delete(this.rejectedevents, event.InstructionId) } } func (this *OrderBook) SetObStatus(event *response.OrderBookStatusEvent) { this.mtf.SetObStatus(event) if this.orderbookstatusevent != nil { this.orderbookstatusevent("SystemUpdate", event) } } func (this *OrderBook) Init(event *InitEvent) { updated := this.mtf.Init(event) this.eventDispatch("SystemUpdate", updated) } //执行作为整体来更新 func (this *OrderBook) SetOneExecution(event *OneExecutionEvent) { updated := this.mtf.SetOneExecution(event) this.eventDispatch("SystemUpdate", updated) } func (this *OrderBook) SetPosition(event *response.PositionEvent) { this.mtf.SetPosition(event) if this.positionevent != nil { this.positionevent("SystemUpdate", event) } } func (this *OrderBook) SetAccount(account *response.AccountStateEvent) { this.mtf.SetAccount(account) if this.accountevent != nil { this.accountevent("SystemUpdate", account) } } func (this *OrderBook) SetOrder(event *response.OrderEvent) { this.mtf.SetOrder(event) this.executeEvent("SystemUpdate", event) } //SetExecution func (this *OrderBook) SetExecution(event *response.ExecutionEvent) { this.mtf.SetExecution(event) //this.executeEvent("SystemExecution", event) } //更新tick, 这个操作是最频繁的 func (this *OrderBook) UpdateTick(event *response.OrderBookEvent) { req := &TickEvent{} req.ob2 = event req.isCopy = true req.fetchUpdated = true updated := this.mtf.UpdateTick(req) //log.Println(updated) this.eventDispatch("TickUpdate", updated) } //货币对列表 //暂时不做更新 func (this *OrderBook) SetInstrument(event *response.Instrument) { err := this.mtf.SetInstrument(event) if err != nil { log.Println(err) return } } //暂时不做更新 func (this *OrderBook) SetExchangeRate(event *response.ExchangeRateEvent) { req := &RateEvent{} req.erate = event req.isCopy = true req.fetchUpdated = true updated := this.mtf.SetExchangeRate(req) this.eventDispatch("ExchangeRate", updated) } func (this *OrderBook) executeEvent(status string, event *response.OrderEvent) { id := event.InstructionId orderId := event.OrderId if this.orderevent != nil { this.orderevent(status, event) } events, newid := this.findEvent(id) if newid == "" { if instId, ok := this.orderIdToInstId[orderId]; ok { events, newid = this.findEvent(instId) } else { return } } else { this.orderIdToInstId[orderId] = newid } for i := 0; i < len(events); i++ { cb := events[i] cb(status, event) } } func (this *OrderBook) eventDispatch(status string, updated *AccountUpdated) { if updated == nil { log.Println("err eventDispatch") return } for i := 0; i < len(updated.orders); i++ { this.executeEvent(status, updated.orders[i]) } if this.positionevent != nil { for i := 0; i < len(updated.positions); i++ { this.positionevent(status, updated.positions[i]) } } if this.accountevent != nil && updated.account != nil { this.accountevent(status, updated.account) } } func (this *OrderBook) findEvent(id string) ([]func(string, *response.OrderEvent), string) { if events, ok := this.events[id]; ok { return events, id } return nil, "" } func (this *OrderBook) GetAccount() *response.AccountStateEvent { return this.mtf.GetAccount() } func (this *OrderBook) GetOrderBookEvent(id int64) *response.OrderBookEvent { return this.mtf.GetOb2(id) } func (this *OrderBook) GetPosition(id int64) *response.PositionEvent { return this.mtf.GetPosition(id) } func (this *OrderBook) GetPositions(iscopy bool) []*response.PositionEvent { return this.mtf.GetPositions(iscopy) } func (this *OrderBook) GetInstrument(id int64) *response.Instrument { return this.mtf.GetInstrument(id) } func (this *OrderBook) GetInstruments(iscopy bool) []*response.Instrument { return this.mtf.GetInstruments(iscopy) } //从mtf中接受各种各样的信息 func (this *OrderBook) Start() { // log.Println("go start()") this.mtf.Recv(func(msg *Message) { this.dispatch(msg) }) // log.Println("order book start end") } func (this *OrderBook) dispatch(msg *Message) { this.mu.Lock() defer this.mu.Unlock() //do some thing if msg.Err != nil { log.Println("msg error:", msg) return } if msg.Type == MsgSetTick && this.orderbookevent != nil { ob2 := msg.Data.(*AccountUpdated).Req.(*TickEvent).ob2 this.orderbookevent(ob2) return } if msg.Type == MsgSetExchangeRate || msg.Type == MsgSetInstrument { //内部信息,不对外发布 return } // log.Println("order book msg", msg) } func (this *OrderBook) GetOffsetByPoint(inst int64, point float64) (float64, error) { instrument := this.mtf.GetInstrument(inst) if instrument == nil { return 0, errors.New("OrderBook InstrumentId error.") } return instrument.PriceIncrement * point * 10, nil } //order book 交易辅助函数 func PositionTrackingInit(session *Session) error { //根据lmax的变化规则,一次执行总是以 order position accout 的顺序给出 //所以我合并上述四个监听为一个,这样保证整个执行能完成的更新到mtf。 //这是对思路方法的一个改进:初始化-->执行-->执行,更新着用户账户的变化 //上面三个是交易相关 session.RegisterInitEvent(func(session *Session, event *InitEvent) { session.GetOrderBook().Init(event) }) session.RegisterOneExecutionEvent(func(s *Session, event *OneExecutionEvent) { PrintStruct(event) session.GetOrderBook().SetOneExecution(event) }) session.RegisterInstructionRejectedEvent(func(session *Session, event *response.InstructionRejectedEvent) { session.GetOrderBook().SetRejected(event) }) //下面三个是行情相关 session.RegisterOrderBookEvent(func(s *Session, event *response.OrderBookEvent) { if len(event.AskPrices) == 0 || len(event.BidPrices) == 0 { log.Println("err tick", event) return } session.GetOrderBook().UpdateTick(event) }) session.RegisterOrderBookStatusEvent(func(s *Session, event *response.OrderBookStatusEvent) { session.GetOrderBook().SetObStatus(event) }) session.RegisterExchangeRateEvent(func(session *Session, event *response.ExchangeRateEvent) { session.GetOrderBook().SetExchangeRate(event) }) return nil } func PositionTrackingSub(session *Session) error { subs := NewMultiSubscribe() session.LoadAllInstruments(func(value *response.Instrument) { session.GetOrderBook().SetInstrument(value) subs.Add(request.NewOrderBookSubscriptionRequest(value.Id)) if value.Currency != "USD" { subs.Add(request.NewExchangeRateRequest(value.Currency, "USD")) } subs.Add(request.NewInstrumentRequest(value.Id)) subs.Add(request.NewOrderBookStatusRequest(value.Id)) }) subs.Add(request.NewPositionSubscriptionRequest()) subs.Add(request.NewOrderSubscriptionRequest()) subs.Add(request.NewAccountSubscriptionRequest()) return session.Subscribe(subs, nil) }