123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352 |
- package lmaxapi
- import (
- "errors"
- "tickserver/api/lmaxapi/response"
- "tickserver/api/lmaxapi/util"
- "log"
- "strings"
- )
- var _ = log.Println
- type OrderBookListener func(session *Session, event *response.OrderBookEvent)
- type OrderBookStatusListener func(session *Session, event *response.OrderBookStatusEvent)
- type OrderListener func(session *Session, event *response.OrderEvent)
- type StreamFailureListener func(session *Session, event error)
- type InstructionRejectedListener func(session *Session, event *response.InstructionRejectedEvent)
- type AccountStateListener func(session *Session, event *response.AccountStateEvent)
- type ExecutionListener func(session *Session, event *response.ExecutionEvent)
- type HeartbeatListener func(session *Session, accountId int64, token string)
- type ExchangeRateListener func(session *Session, event *response.ExchangeRateEvent)
- type PositionListener func(session *Session, event *response.PositionEvent)
- type SessionDisconnectedListener func(session *Session)
- type HistoricMarketDataListener func(session *Session, event *response.HistoricMarketDataEvent)
- type InitListener func(session *Session, event *InitEvent)
- type OneExecutionListener func(session *Session, event *OneExecutionEvent)
- type EventHandler struct {
- EventSession *Session
- OnOrderBookEvent OrderBookListener
- OnOrderEvent OrderListener
- OnOrderBookStatusEvent OrderBookStatusListener
- OnAccountStateEvent AccountStateListener
- OnExecutionEvent ExecutionListener
- OnHeartbeatEvent HeartbeatListener
- OnPositionEvent PositionListener
- OnStreamFailure StreamFailureListener
- OnInstructionRejected InstructionRejectedListener
- OnSessionDisconnected SessionDisconnectedListener
- OnHistoricEvent HistoricMarketDataListener
- OnExchangeRateEvent ExchangeRateListener
- OnInit InitListener
- OnOneExecution OneExecutionListener
- oneExecution *OneExecutionSM
- }
- const (
- oneExecutionInit = iota
- oneExecutionOrder
- oneExecutionPosition
- oneExecutionAccount
- )
- //初始化信息
- type InitEvent struct {
- OrderBook []*response.OrderBookEvent
- ExchangeRate []*response.ExchangeRateEvent
- Position []*response.PositionEvent
- Order []*response.OrderEvent
- Account *response.AccountStateEvent
- AccountDetail *response.AccountDetails
- }
- //一般服务器的一次更新都是以这样的顺序:order position account
- //他们是一个整体,我们不能割裂为部分。
- type OneExecutionEvent struct {
- Order []*response.OrderEvent
- Position []*response.PositionEvent
- Account *response.AccountStateEvent
- }
- type OneExecutionSM struct {
- current OneExecutionEvent
- status int
- handle *EventHandler
- }
- func (this *OneExecutionSM) SetState(state interface{}) error {
- switch state.(type) {
- case *response.OrderEvent:
- // log.Println("SetState::OrderEvent", state)
- if this.status != oneExecutionInit && this.status != oneExecutionOrder {
- this.reset()
- return errors.New("OneExecutionSM::SetState order error")
- }
- this.status = oneExecutionOrder
- this.current.Order = append(this.current.Order, state.(*response.OrderEvent))
- case *response.PositionEvent:
- // log.Println("SetState::PositionEvent", state)
- if this.status != oneExecutionOrder && this.status != oneExecutionPosition {
- this.reset()
- return errors.New("OneExecutionSM::SetState position error")
- }
- this.status = oneExecutionPosition
- this.current.Position = append(this.current.Position, state.(*response.PositionEvent))
- case *response.AccountStateEvent:
- // log.Println("SetState::AccountStateEvent", state)
- if this.status != oneExecutionPosition && this.status != oneExecutionOrder {
- this.reset()
- return errors.New("OneExecutionSM::SetState account error")
- }
- this.status = oneExecutionAccount
- this.current.Account = state.(*response.AccountStateEvent)
- this.sendEvent()
- this.reset()
- case *response.OrderBookEvent:
- if this.status == oneExecutionInit {
- return nil
- }
- // log.Println("SetState::FlushOb2", state)
- //有时只有order 没有 account 和 position,用价格来刷新
- this.sendEvent()
- this.reset()
- }
- return nil
- }
- func (this *OneExecutionSM) reset() {
- this.current = OneExecutionEvent{}
- this.status = oneExecutionInit
- }
- func (this *OneExecutionSM) sendEvent() {
- tmp := this.current
- if this.handle.OnOneExecution != nil {
- this.handle.OnOneExecution(this.handle.EventSession, &tmp)
- }
- }
- func (this *EventHandler) HandleEventData(bodyData string, isinit bool) {
- init := InitEvent{}
- init.AccountDetail = this.EventSession.GetAccountDetails()
- if isinit {
- //tracelog.Println("init EventHandler:", bodyData)
- this.oneExecution.reset()
- }
- var eventData, name string
- if strings.Index(bodyData, "<events>") == 0 {
- bodyData = util.ParseXmlNameData(bodyData, []string{"events", "body"})
- }
- for {
- name, eventData, bodyData = util.ParseXmlNode(bodyData)
- if name == "" {
- break
- }
- switch name {
- case "events":
- this.HandleEventData(util.ParseXmlNameData(eventData, []string{"events", "body"}), isinit)
- case "ob2":
- event := this.HandleOrderBookEvent(eventData)
- if !isinit {
- this.oneExecution.SetState(event)
- }
- if isinit && event != nil {
- init.OrderBook = append(init.OrderBook, event)
- }
- case "exchangeRate":
- event := this.HandleExchangeRateEvent(eventData)
- if isinit && event != nil {
- init.ExchangeRate = append(init.ExchangeRate, event)
- }
- case "instructionRejected":
- this.HandleRejectedEvent(eventData)
- case "accountState":
- event := this.HandleAccountStateEvent(eventData)
- if !isinit {
- err := this.oneExecution.SetState(event)
- if err != nil {
- //tracelog.Println("SetState", err)
- }
- }
- if isinit && event != nil {
- init.Account = event
- }
- case "order":
- events := this.HandleOrderEvent(eventData)
- if !isinit {
- for i := 0; i < len(events); i++ {
- err := this.oneExecution.SetState(events[i])
- if err != nil {
- //tracelog.Println("SetState:", err)
- }
- }
- }
- if isinit && events != nil {
- init.Order = append(init.Order, events...)
- }
- case "orderBookStatus":
- this.HandleOrderBookStatusEvent(eventData)
- case "heartbeat":
- this.HandleHeartbeatEvent(eventData)
- case "position":
- events := this.HandlePositionEvent(eventData)
- if !isinit {
- for i := 0; i < len(events); i++ {
- err := this.oneExecution.SetState(events[i])
- if err != nil {
- //tracelog.Println("SetState:", err)
- }
- }
- }
- if isinit && events != nil {
- init.Position = append(init.Position, events...)
- }
- case "historicMarketData":
- this.HandleHistoricEvent(eventData)
- case "orders":
- data := util.ParseXmlNameData(eventData, []string{name, "page"})
- events := this.HandleOrderEvent(data)
- if isinit && events != nil {
- init.Order = append(init.Order, events...)
- }
- case "positions":
- data := util.ParseXmlNameData(eventData, []string{name, "page"})
- events := this.HandlePositionEvent(data)
- if isinit && events != nil {
- init.Position = append(init.Position, events...)
- }
- default:
- //tracelog.Println("default EventHandler:", name, eventData)
- }
- //tracelog.Println(name, "end")
- }
- if isinit {
- //tracelog.Println("init beg")
- this.HandleInitEvent(&init)
- //tracelog.Println("init end")
- }
- }
- func (this *EventHandler) HandleEventError(err error) {
- if strings.ToLower(err.(*OpError).Op) == "stream" {
- if this.OnStreamFailure != nil {
- this.OnStreamFailure(this.EventSession, err)
- }
- }
- }
- func (this *EventHandler) HandleInitEvent(event *InitEvent) {
- if this.OnInit != nil {
- this.OnInit(this.EventSession, event)
- }
- }
- func (this *EventHandler) HandleEventSessionDisconnected() {
- if this.OnSessionDisconnected != nil {
- this.OnSessionDisconnected(this.EventSession)
- }
- }
- func (this *EventHandler) HandleExchangeRateEvent(data string) *response.ExchangeRateEvent {
- event := response.NewExchangeRateEvent(data)
- if event != nil && this.OnExchangeRateEvent != nil {
- this.OnExchangeRateEvent(this.EventSession, event)
- }
- return event
- }
- func (this *EventHandler) HandleAccountStateEvent(data string) *response.AccountStateEvent {
- event := response.NewAccountStateEvent(data)
- if event != nil && this.OnAccountStateEvent != nil {
- this.OnAccountStateEvent(this.EventSession, event)
- }
- return event
- }
- func (this *EventHandler) HandleHeartbeatEvent(data string) {
- event := response.NewHeartbeatEvent(data)
- if event != nil && this.OnHeartbeatEvent != nil {
- this.OnHeartbeatEvent(this.EventSession, event.AccountId, event.Token)
- }
- }
- func (this *EventHandler) HandleOrderBookEvent(data string) *response.OrderBookEvent {
- if bodyData := util.ParseXmlNameData(data, []string{"ob2"}); bodyData != "" {
- event := response.NewOrderBookEvent(bodyData)
- if this.OnOrderBookEvent != nil {
- this.OnOrderBookEvent(this.EventSession, event)
- }
- return event
- }
- return nil
- }
- func (this *EventHandler) HandleOrderBookStatusEvent(data string) {
- event := response.NewOrderBookStatusEvent(data)
- if event != nil && this.OnOrderBookStatusEvent != nil {
- this.OnOrderBookStatusEvent(this.EventSession, event)
- }
- }
- func (this *EventHandler) HandleOrderEvent(data string) (result []*response.OrderEvent) {
- var name, eventData string
- for {
- name, eventData, data = util.ParseXmlNode(data)
- if name == "" {
- break
- }
- if name == "order" {
- event, execution := response.NewOrderEvent(eventData)
- result = append(result, event)
- if execution != nil && this.OnExecutionEvent != nil && event.IsExecution() {
- this.OnExecutionEvent(this.EventSession, execution)
- }
- if event != nil && this.OnOrderEvent != nil {
- this.OnOrderEvent(this.EventSession, event)
- }
- }
- }
- return
- }
- func (this *EventHandler) HandlePositionEvent(data string) (result []*response.PositionEvent) {
- var name, eventData string
- for {
- name, eventData, data = util.ParseXmlNode(data)
- if name == "" {
- break
- }
- if name == "position" {
- event := response.NewPositionEvent(eventData)
- result = append(result, event)
- if event != nil && this.OnPositionEvent != nil {
- this.OnPositionEvent(this.EventSession, event)
- }
- }
- }
- return
- }
- func (this *EventHandler) HandleRejectedEvent(data string) {
- event := response.NewInstructionRejectedEvent(data)
- if event != nil && this.OnInstructionRejected != nil {
- this.OnInstructionRejected(this.EventSession, event)
- }
- }
- func (this *EventHandler) HandleHistoricEvent(data string) {
- event := response.NewHistoricMarketData(data)
- if event != nil && this.OnHistoricEvent != nil {
- this.OnHistoricEvent(this.EventSession, event)
- }
- }
- func NewEventHandler(session *Session) *EventHandler {
- eventHandler := EventHandler{}
- eventHandler.EventSession = session
- eventHandler.oneExecution = &OneExecutionSM{}
- eventHandler.oneExecution.status = oneExecutionInit
- eventHandler.oneExecution.handle = &eventHandler
- return &eventHandler
- }
|