123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389 |
- package lmaxapi
- import "tickserver/api/lmaxapi/request"
- import "tickserver/api/lmaxapi/response"
- import "sync"
- import "reflect"
- import "time"
- import "log"
- import "errors"
- //会发生一个线程写,一个线程读的情况,暂时先全部加锁
- type OrderBook struct {
- mu sync.Mutex
- orderevent func(string, *response.OrderEvent)
- positionevent func(string, *response.PositionEvent)
- orderbookstatusevent func(string, *response.OrderBookStatusEvent)
- rejectedevent func(string, *response.InstructionRejectedEvent)
- rejectedevents map[string]func(string, *response.InstructionRejectedEvent)
- accountevent func(string, *response.AccountStateEvent)
- orderbookevent func(*response.OrderBookEvent)
- events map[string][]func(string, *response.OrderEvent)
- orderIdToInstId map[string]string
- mtf *MtfClient
- }
- //在账户登出又重新登录的情况下,我们需要做特殊的处理
- //价格还是简单的被重写
- //accountStatus 也可以简单的被set
- //但是order 和 position 要进行下面的操作:
- //增加一个init事件,这个事件是用户的一个快照。<seq>0</seq>的事件
- func NewOrderBook(mtf *Mtf) *OrderBook {
- //log.Println("NewOrderBook")
- ob := &OrderBook{}
- ob.events = make(map[string][]func(string, *response.OrderEvent))
- ob.orderIdToInstId = make(map[string]string)
- ob.rejectedevents = make(map[string]func(string, *response.InstructionRejectedEvent))
- ob.mtf = NewMtfClient(mtf, time.Second, false, 1024)
- go ob.Start()
- err := ob.mtf.ConnectPrivate()
- if err != nil {
- log.Println(err)
- return nil
- }
- err = ob.mtf.ConnectPublic()
- if err != nil {
- log.Println(err)
- return nil
- }
- return ob
- }
- func (this *OrderBook) Stop() {
- this.mtf.Close()
- this.mtf.GetMtf().Close()
- }
- //如果在多线程环境下,或者要修改,那么要采用copy模式
- func (this *OrderBook) GetOrders(iscopy bool) []*response.OrderEvent {
- return this.GetOrdersUnsafe(iscopy)
- }
- func (this *OrderBook) GetOrdersUnsafe(iscopy bool) []*response.OrderEvent {
- return this.mtf.GetOrders(iscopy)
- }
- //对某个订单添加监听事件
- func (this *OrderBook) AddEvent(instructId string, cb func(status string, event *response.OrderEvent)) {
- this.mu.Lock()
- defer this.mu.Unlock()
- //查找相关的事件 instrunctId
- events, id := this.findEvent(instructId)
- orders := this.GetOrders(true)
- if id == "" { //如果没有找到,那么添加事件
- id = instructId
- for i := 0; i < len(orders); i++ {
- if orders[i].InstructionId == instructId {
- this.orderIdToInstId[orders[i].OrderId] = instructId
- break
- }
- }
- }
- for i := 0; i < len(events); i++ {
- if reflect.ValueOf(cb).Pointer() == reflect.ValueOf(events[i]).Pointer() {
- return
- }
- }
- this.events[id] = append(events, cb)
- }
- func (this *OrderBook) AddRejectEvent(instructId string, cb func(status string, event *response.InstructionRejectedEvent)) {
- this.mu.Lock()
- defer this.mu.Unlock()
- this.rejectedevents[instructId] = cb
- }
- func (this *OrderBook) SetOrderBookEvent(cb func(event *response.OrderBookEvent)) {
- this.mu.Lock()
- defer this.mu.Unlock()
- this.orderbookevent = cb
- }
- func (this *OrderBook) ClearEvent(instrunctId string, cb func(status string, event *response.OrderEvent)) {
- this.mu.Lock()
- defer this.mu.Unlock()
- events, id := this.findEvent(instrunctId)
- if id == "" { //如果没有找到,那么无法删除事件
- return
- }
- for i := 0; i < len(events); i++ {
- if reflect.ValueOf(cb).Pointer() == reflect.ValueOf(events[i]).Pointer() {
- events = append(events[:i], events[i+1:]...)
- break
- }
- }
- this.events[id] = events
- }
- func (this *OrderBook) SetOrderEvent(cb func(status string, event *response.OrderEvent)) {
- this.mu.Lock()
- defer this.mu.Unlock()
- this.orderevent = cb
- }
- func (this *OrderBook) SetRejectedEvent(cb func(status string, event *response.InstructionRejectedEvent)) {
- this.mu.Lock()
- defer this.mu.Unlock()
- this.rejectedevent = cb
- }
- func (this *OrderBook) SetOrderBookStatusEvent(cb func(status string, event *response.OrderBookStatusEvent)) {
- this.mu.Lock()
- defer this.mu.Unlock()
- this.orderbookstatusevent = cb
- }
- func (this *OrderBook) SetPositionEvent(cb func(status string, event *response.PositionEvent)) {
- this.mu.Lock()
- defer this.mu.Unlock()
- this.positionevent = cb
- }
- func (this *OrderBook) SetAccountEvent(cb func(status string, event *response.AccountStateEvent)) {
- this.mu.Lock()
- defer this.mu.Unlock()
- this.accountevent = cb
- }
- func (this *OrderBook) SetRejected(event *response.InstructionRejectedEvent) {
- this.mtf.SetRejected(event)
- if this.rejectedevent != nil {
- this.rejectedevent("SystemUpdate", event)
- }
- if cb, ok := this.rejectedevents[event.InstructionId]; ok {
- cb("SystemUpdate", event)
- delete(this.rejectedevents, event.InstructionId)
- }
- }
- func (this *OrderBook) SetObStatus(event *response.OrderBookStatusEvent) {
- this.mtf.SetObStatus(event)
- if this.orderbookstatusevent != nil {
- this.orderbookstatusevent("SystemUpdate", event)
- }
- }
- func (this *OrderBook) Init(event *InitEvent) {
- updated := this.mtf.Init(event)
- this.eventDispatch("SystemUpdate", updated)
- }
- //执行作为整体来更新
- func (this *OrderBook) SetOneExecution(event *OneExecutionEvent) {
- updated := this.mtf.SetOneExecution(event)
- this.eventDispatch("SystemUpdate", updated)
- }
- func (this *OrderBook) SetPosition(event *response.PositionEvent) {
- this.mtf.SetPosition(event)
- if this.positionevent != nil {
- this.positionevent("SystemUpdate", event)
- }
- }
- func (this *OrderBook) SetAccount(account *response.AccountStateEvent) {
- this.mtf.SetAccount(account)
- if this.accountevent != nil {
- this.accountevent("SystemUpdate", account)
- }
- }
- func (this *OrderBook) SetOrder(event *response.OrderEvent) {
- this.mtf.SetOrder(event)
- this.executeEvent("SystemUpdate", event)
- }
- //SetExecution
- func (this *OrderBook) SetExecution(event *response.ExecutionEvent) {
- this.mtf.SetExecution(event)
- //this.executeEvent("SystemExecution", event)
- }
- //更新tick, 这个操作是最频繁的
- func (this *OrderBook) UpdateTick(event *response.OrderBookEvent) {
- req := &TickEvent{}
- req.ob2 = event
- req.isCopy = true
- req.fetchUpdated = true
- updated := this.mtf.UpdateTick(req)
- //log.Println(updated)
- this.eventDispatch("TickUpdate", updated)
- }
- //货币对列表
- //暂时不做更新
- func (this *OrderBook) SetInstrument(event *response.Instrument) {
- err := this.mtf.SetInstrument(event)
- if err != nil {
- log.Println(err)
- return
- }
- }
- //暂时不做更新
- func (this *OrderBook) SetExchangeRate(event *response.ExchangeRateEvent) {
- req := &RateEvent{}
- req.erate = event
- req.isCopy = true
- req.fetchUpdated = true
- updated := this.mtf.SetExchangeRate(req)
- this.eventDispatch("ExchangeRate", updated)
- }
- func (this *OrderBook) executeEvent(status string, event *response.OrderEvent) {
- id := event.InstructionId
- orderId := event.OrderId
- if this.orderevent != nil {
- this.orderevent(status, event)
- }
- events, newid := this.findEvent(id)
- if newid == "" {
- if instId, ok := this.orderIdToInstId[orderId]; ok {
- events, newid = this.findEvent(instId)
- } else {
- return
- }
- } else {
- this.orderIdToInstId[orderId] = newid
- }
- for i := 0; i < len(events); i++ {
- cb := events[i]
- cb(status, event)
- }
- }
- func (this *OrderBook) eventDispatch(status string, updated *AccountUpdated) {
- if updated == nil {
- log.Println("err eventDispatch")
- return
- }
- for i := 0; i < len(updated.orders); i++ {
- this.executeEvent(status, updated.orders[i])
- }
- if this.positionevent != nil {
- for i := 0; i < len(updated.positions); i++ {
- this.positionevent(status, updated.positions[i])
- }
- }
- if this.accountevent != nil && updated.account != nil {
- this.accountevent(status, updated.account)
- }
- }
- func (this *OrderBook) findEvent(id string) ([]func(string, *response.OrderEvent), string) {
- if events, ok := this.events[id]; ok {
- return events, id
- }
- return nil, ""
- }
- func (this *OrderBook) GetAccount() *response.AccountStateEvent {
- return this.mtf.GetAccount()
- }
- func (this *OrderBook) GetOrderBookEvent(id int64) *response.OrderBookEvent {
- return this.mtf.GetOb2(id)
- }
- func (this *OrderBook) GetPosition(id int64) *response.PositionEvent {
- return this.mtf.GetPosition(id)
- }
- func (this *OrderBook) GetPositions(iscopy bool) []*response.PositionEvent {
- return this.mtf.GetPositions(iscopy)
- }
- func (this *OrderBook) GetInstrument(id int64) *response.Instrument {
- return this.mtf.GetInstrument(id)
- }
- func (this *OrderBook) GetInstruments(iscopy bool) []*response.Instrument {
- return this.mtf.GetInstruments(iscopy)
- }
- //从mtf中接受各种各样的信息
- func (this *OrderBook) Start() {
- // log.Println("go start()")
- this.mtf.Recv(func(msg *Message) {
- this.dispatch(msg)
- })
- // log.Println("order book start end")
- }
- func (this *OrderBook) dispatch(msg *Message) {
- this.mu.Lock()
- defer this.mu.Unlock()
- //do some thing
- if msg.Err != nil {
- log.Println("msg error:", msg)
- return
- }
- if msg.Type == MsgSetTick && this.orderbookevent != nil {
- ob2 := msg.Data.(*AccountUpdated).Req.(*TickEvent).ob2
- this.orderbookevent(ob2)
- return
- }
- if msg.Type == MsgSetExchangeRate || msg.Type == MsgSetInstrument {
- //内部信息,不对外发布
- return
- }
- // log.Println("order book msg", msg)
- }
- func (this *OrderBook) GetOffsetByPoint(inst int64, point float64) (float64, error) {
- instrument := this.mtf.GetInstrument(inst)
- if instrument == nil {
- return 0, errors.New("OrderBook InstrumentId error.")
- }
- return instrument.PriceIncrement * point * 10, nil
- }
- //order book 交易辅助函数
- func PositionTrackingInit(session *Session) error {
- //根据lmax的变化规则,一次执行总是以 order position accout 的顺序给出
- //所以我合并上述四个监听为一个,这样保证整个执行能完成的更新到mtf。
- //这是对思路方法的一个改进:初始化-->执行-->执行,更新着用户账户的变化
- //上面三个是交易相关
- session.RegisterInitEvent(func(session *Session, event *InitEvent) {
- session.GetOrderBook().Init(event)
- })
- session.RegisterOneExecutionEvent(func(s *Session, event *OneExecutionEvent) {
- PrintStruct(event)
- session.GetOrderBook().SetOneExecution(event)
- })
- session.RegisterInstructionRejectedEvent(func(session *Session, event *response.InstructionRejectedEvent) {
- session.GetOrderBook().SetRejected(event)
- })
- //下面三个是行情相关
- session.RegisterOrderBookEvent(func(s *Session, event *response.OrderBookEvent) {
- if len(event.AskPrices) == 0 || len(event.BidPrices) == 0 {
- log.Println("err tick", event)
- return
- }
- session.GetOrderBook().UpdateTick(event)
- })
- session.RegisterOrderBookStatusEvent(func(s *Session, event *response.OrderBookStatusEvent) {
- session.GetOrderBook().SetObStatus(event)
- })
- session.RegisterExchangeRateEvent(func(session *Session, event *response.ExchangeRateEvent) {
- session.GetOrderBook().SetExchangeRate(event)
- })
- return nil
- }
- func PositionTrackingSub(session *Session) error {
- subs := NewMultiSubscribe()
- session.LoadAllInstruments(func(value *response.Instrument) {
- session.GetOrderBook().SetInstrument(value)
- subs.Add(request.NewOrderBookSubscriptionRequest(value.Id))
- if value.Currency != "USD" {
- subs.Add(request.NewExchangeRateRequest(value.Currency, "USD"))
- }
- subs.Add(request.NewInstrumentRequest(value.Id))
- subs.Add(request.NewOrderBookStatusRequest(value.Id))
- })
- subs.Add(request.NewPositionSubscriptionRequest())
- subs.Add(request.NewOrderSubscriptionRequest())
- subs.Add(request.NewAccountSubscriptionRequest())
- return session.Subscribe(subs, nil)
- }
|