package lmaxapi import ( "errors" "tickserver/api/lmaxapi/response" "tickserver/api/lmaxapi/util" "log" "strings" ) var _ = log.Println type OrderBookListener func(session *Session, event *response.OrderBookEvent) type OrderBookStatusListener func(session *Session, event *response.OrderBookStatusEvent) type OrderListener func(session *Session, event *response.OrderEvent) type StreamFailureListener func(session *Session, event error) type InstructionRejectedListener func(session *Session, event *response.InstructionRejectedEvent) type AccountStateListener func(session *Session, event *response.AccountStateEvent) type ExecutionListener func(session *Session, event *response.ExecutionEvent) type HeartbeatListener func(session *Session, accountId int64, token string) type ExchangeRateListener func(session *Session, event *response.ExchangeRateEvent) type PositionListener func(session *Session, event *response.PositionEvent) type SessionDisconnectedListener func(session *Session) type HistoricMarketDataListener func(session *Session, event *response.HistoricMarketDataEvent) type InitListener func(session *Session, event *InitEvent) type OneExecutionListener func(session *Session, event *OneExecutionEvent) type EventHandler struct { EventSession *Session OnOrderBookEvent OrderBookListener OnOrderEvent OrderListener OnOrderBookStatusEvent OrderBookStatusListener OnAccountStateEvent AccountStateListener OnExecutionEvent ExecutionListener OnHeartbeatEvent HeartbeatListener OnPositionEvent PositionListener OnStreamFailure StreamFailureListener OnInstructionRejected InstructionRejectedListener OnSessionDisconnected SessionDisconnectedListener OnHistoricEvent HistoricMarketDataListener OnExchangeRateEvent ExchangeRateListener OnInit InitListener OnOneExecution OneExecutionListener oneExecution *OneExecutionSM } const ( oneExecutionInit = iota oneExecutionOrder oneExecutionPosition oneExecutionAccount ) //初始化信息 type InitEvent struct { OrderBook []*response.OrderBookEvent ExchangeRate []*response.ExchangeRateEvent Position []*response.PositionEvent Order []*response.OrderEvent Account *response.AccountStateEvent AccountDetail *response.AccountDetails } //一般服务器的一次更新都是以这样的顺序:order position account //他们是一个整体,我们不能割裂为部分。 type OneExecutionEvent struct { Order []*response.OrderEvent Position []*response.PositionEvent Account *response.AccountStateEvent } type OneExecutionSM struct { current OneExecutionEvent status int handle *EventHandler } func (this *OneExecutionSM) SetState(state interface{}) error { switch state.(type) { case *response.OrderEvent: // log.Println("SetState::OrderEvent", state) if this.status != oneExecutionInit && this.status != oneExecutionOrder { this.reset() return errors.New("OneExecutionSM::SetState order error") } this.status = oneExecutionOrder this.current.Order = append(this.current.Order, state.(*response.OrderEvent)) case *response.PositionEvent: // log.Println("SetState::PositionEvent", state) if this.status != oneExecutionOrder && this.status != oneExecutionPosition { this.reset() return errors.New("OneExecutionSM::SetState position error") } this.status = oneExecutionPosition this.current.Position = append(this.current.Position, state.(*response.PositionEvent)) case *response.AccountStateEvent: // log.Println("SetState::AccountStateEvent", state) if this.status != oneExecutionPosition && this.status != oneExecutionOrder { this.reset() return errors.New("OneExecutionSM::SetState account error") } this.status = oneExecutionAccount this.current.Account = state.(*response.AccountStateEvent) this.sendEvent() this.reset() case *response.OrderBookEvent: if this.status == oneExecutionInit { return nil } // log.Println("SetState::FlushOb2", state) //有时只有order 没有 account 和 position,用价格来刷新 this.sendEvent() this.reset() } return nil } func (this *OneExecutionSM) reset() { this.current = OneExecutionEvent{} this.status = oneExecutionInit } func (this *OneExecutionSM) sendEvent() { tmp := this.current if this.handle.OnOneExecution != nil { this.handle.OnOneExecution(this.handle.EventSession, &tmp) } } func (this *EventHandler) HandleEventData(bodyData string, isinit bool) { init := InitEvent{} init.AccountDetail = this.EventSession.GetAccountDetails() if isinit { //tracelog.Println("init EventHandler:", bodyData) this.oneExecution.reset() } var eventData, name string if strings.Index(bodyData, "") == 0 { bodyData = util.ParseXmlNameData(bodyData, []string{"events", "body"}) } for { name, eventData, bodyData = util.ParseXmlNode(bodyData) if name == "" { break } switch name { case "events": this.HandleEventData(util.ParseXmlNameData(eventData, []string{"events", "body"}), isinit) case "ob2": event := this.HandleOrderBookEvent(eventData) if !isinit { this.oneExecution.SetState(event) } if isinit && event != nil { init.OrderBook = append(init.OrderBook, event) } case "exchangeRate": event := this.HandleExchangeRateEvent(eventData) if isinit && event != nil { init.ExchangeRate = append(init.ExchangeRate, event) } case "instructionRejected": this.HandleRejectedEvent(eventData) case "accountState": event := this.HandleAccountStateEvent(eventData) if !isinit { err := this.oneExecution.SetState(event) if err != nil { //tracelog.Println("SetState", err) } } if isinit && event != nil { init.Account = event } case "order": events := this.HandleOrderEvent(eventData) if !isinit { for i := 0; i < len(events); i++ { err := this.oneExecution.SetState(events[i]) if err != nil { //tracelog.Println("SetState:", err) } } } if isinit && events != nil { init.Order = append(init.Order, events...) } case "orderBookStatus": this.HandleOrderBookStatusEvent(eventData) case "heartbeat": this.HandleHeartbeatEvent(eventData) case "position": events := this.HandlePositionEvent(eventData) if !isinit { for i := 0; i < len(events); i++ { err := this.oneExecution.SetState(events[i]) if err != nil { //tracelog.Println("SetState:", err) } } } if isinit && events != nil { init.Position = append(init.Position, events...) } case "historicMarketData": this.HandleHistoricEvent(eventData) case "orders": data := util.ParseXmlNameData(eventData, []string{name, "page"}) events := this.HandleOrderEvent(data) if isinit && events != nil { init.Order = append(init.Order, events...) } case "positions": data := util.ParseXmlNameData(eventData, []string{name, "page"}) events := this.HandlePositionEvent(data) if isinit && events != nil { init.Position = append(init.Position, events...) } default: //tracelog.Println("default EventHandler:", name, eventData) } //tracelog.Println(name, "end") } if isinit { //tracelog.Println("init beg") this.HandleInitEvent(&init) //tracelog.Println("init end") } } func (this *EventHandler) HandleEventError(err error) { if strings.ToLower(err.(*OpError).Op) == "stream" { if this.OnStreamFailure != nil { this.OnStreamFailure(this.EventSession, err) } } } func (this *EventHandler) HandleInitEvent(event *InitEvent) { if this.OnInit != nil { this.OnInit(this.EventSession, event) } } func (this *EventHandler) HandleEventSessionDisconnected() { if this.OnSessionDisconnected != nil { this.OnSessionDisconnected(this.EventSession) } } func (this *EventHandler) HandleExchangeRateEvent(data string) *response.ExchangeRateEvent { event := response.NewExchangeRateEvent(data) if event != nil && this.OnExchangeRateEvent != nil { this.OnExchangeRateEvent(this.EventSession, event) } return event } func (this *EventHandler) HandleAccountStateEvent(data string) *response.AccountStateEvent { event := response.NewAccountStateEvent(data) if event != nil && this.OnAccountStateEvent != nil { this.OnAccountStateEvent(this.EventSession, event) } return event } func (this *EventHandler) HandleHeartbeatEvent(data string) { event := response.NewHeartbeatEvent(data) if event != nil && this.OnHeartbeatEvent != nil { this.OnHeartbeatEvent(this.EventSession, event.AccountId, event.Token) } } func (this *EventHandler) HandleOrderBookEvent(data string) *response.OrderBookEvent { if bodyData := util.ParseXmlNameData(data, []string{"ob2"}); bodyData != "" { event := response.NewOrderBookEvent(bodyData) if this.OnOrderBookEvent != nil { this.OnOrderBookEvent(this.EventSession, event) } return event } return nil } func (this *EventHandler) HandleOrderBookStatusEvent(data string) { event := response.NewOrderBookStatusEvent(data) if event != nil && this.OnOrderBookStatusEvent != nil { this.OnOrderBookStatusEvent(this.EventSession, event) } } func (this *EventHandler) HandleOrderEvent(data string) (result []*response.OrderEvent) { var name, eventData string for { name, eventData, data = util.ParseXmlNode(data) if name == "" { break } if name == "order" { event, execution := response.NewOrderEvent(eventData) result = append(result, event) if execution != nil && this.OnExecutionEvent != nil && event.IsExecution() { this.OnExecutionEvent(this.EventSession, execution) } if event != nil && this.OnOrderEvent != nil { this.OnOrderEvent(this.EventSession, event) } } } return } func (this *EventHandler) HandlePositionEvent(data string) (result []*response.PositionEvent) { var name, eventData string for { name, eventData, data = util.ParseXmlNode(data) if name == "" { break } if name == "position" { event := response.NewPositionEvent(eventData) result = append(result, event) if event != nil && this.OnPositionEvent != nil { this.OnPositionEvent(this.EventSession, event) } } } return } func (this *EventHandler) HandleRejectedEvent(data string) { event := response.NewInstructionRejectedEvent(data) if event != nil && this.OnInstructionRejected != nil { this.OnInstructionRejected(this.EventSession, event) } } func (this *EventHandler) HandleHistoricEvent(data string) { event := response.NewHistoricMarketData(data) if event != nil && this.OnHistoricEvent != nil { this.OnHistoricEvent(this.EventSession, event) } } func NewEventHandler(session *Session) *EventHandler { eventHandler := EventHandler{} eventHandler.EventSession = session eventHandler.oneExecution = &OneExecutionSM{} eventHandler.oneExecution.status = oneExecutionInit eventHandler.oneExecution.handle = &eventHandler return &eventHandler }