EventHandler.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. package lmaxapi
  2. import (
  3. "errors"
  4. "tickserver/api/lmaxapi/response"
  5. "tickserver/api/lmaxapi/util"
  6. "log"
  7. "strings"
  8. )
  9. var _ = log.Println
  10. type OrderBookListener func(session *Session, event *response.OrderBookEvent)
  11. type OrderBookStatusListener func(session *Session, event *response.OrderBookStatusEvent)
  12. type OrderListener func(session *Session, event *response.OrderEvent)
  13. type StreamFailureListener func(session *Session, event error)
  14. type InstructionRejectedListener func(session *Session, event *response.InstructionRejectedEvent)
  15. type AccountStateListener func(session *Session, event *response.AccountStateEvent)
  16. type ExecutionListener func(session *Session, event *response.ExecutionEvent)
  17. type HeartbeatListener func(session *Session, accountId int64, token string)
  18. type ExchangeRateListener func(session *Session, event *response.ExchangeRateEvent)
  19. type PositionListener func(session *Session, event *response.PositionEvent)
  20. type SessionDisconnectedListener func(session *Session)
  21. type HistoricMarketDataListener func(session *Session, event *response.HistoricMarketDataEvent)
  22. type InitListener func(session *Session, event *InitEvent)
  23. type OneExecutionListener func(session *Session, event *OneExecutionEvent)
  24. type EventHandler struct {
  25. EventSession *Session
  26. OnOrderBookEvent OrderBookListener
  27. OnOrderEvent OrderListener
  28. OnOrderBookStatusEvent OrderBookStatusListener
  29. OnAccountStateEvent AccountStateListener
  30. OnExecutionEvent ExecutionListener
  31. OnHeartbeatEvent HeartbeatListener
  32. OnPositionEvent PositionListener
  33. OnStreamFailure StreamFailureListener
  34. OnInstructionRejected InstructionRejectedListener
  35. OnSessionDisconnected SessionDisconnectedListener
  36. OnHistoricEvent HistoricMarketDataListener
  37. OnExchangeRateEvent ExchangeRateListener
  38. OnInit InitListener
  39. OnOneExecution OneExecutionListener
  40. oneExecution *OneExecutionSM
  41. }
  42. const (
  43. oneExecutionInit = iota
  44. oneExecutionOrder
  45. oneExecutionPosition
  46. oneExecutionAccount
  47. )
  48. //初始化信息
  49. type InitEvent struct {
  50. OrderBook []*response.OrderBookEvent
  51. ExchangeRate []*response.ExchangeRateEvent
  52. Position []*response.PositionEvent
  53. Order []*response.OrderEvent
  54. Account *response.AccountStateEvent
  55. AccountDetail *response.AccountDetails
  56. }
  57. //一般服务器的一次更新都是以这样的顺序:order position account
  58. //他们是一个整体,我们不能割裂为部分。
  59. type OneExecutionEvent struct {
  60. Order []*response.OrderEvent
  61. Position []*response.PositionEvent
  62. Account *response.AccountStateEvent
  63. }
  64. type OneExecutionSM struct {
  65. current OneExecutionEvent
  66. status int
  67. handle *EventHandler
  68. }
  69. func (this *OneExecutionSM) SetState(state interface{}) error {
  70. switch state.(type) {
  71. case *response.OrderEvent:
  72. // log.Println("SetState::OrderEvent", state)
  73. if this.status != oneExecutionInit && this.status != oneExecutionOrder {
  74. this.reset()
  75. return errors.New("OneExecutionSM::SetState order error")
  76. }
  77. this.status = oneExecutionOrder
  78. this.current.Order = append(this.current.Order, state.(*response.OrderEvent))
  79. case *response.PositionEvent:
  80. // log.Println("SetState::PositionEvent", state)
  81. if this.status != oneExecutionOrder && this.status != oneExecutionPosition {
  82. this.reset()
  83. return errors.New("OneExecutionSM::SetState position error")
  84. }
  85. this.status = oneExecutionPosition
  86. this.current.Position = append(this.current.Position, state.(*response.PositionEvent))
  87. case *response.AccountStateEvent:
  88. // log.Println("SetState::AccountStateEvent", state)
  89. if this.status != oneExecutionPosition && this.status != oneExecutionOrder {
  90. this.reset()
  91. return errors.New("OneExecutionSM::SetState account error")
  92. }
  93. this.status = oneExecutionAccount
  94. this.current.Account = state.(*response.AccountStateEvent)
  95. this.sendEvent()
  96. this.reset()
  97. case *response.OrderBookEvent:
  98. if this.status == oneExecutionInit {
  99. return nil
  100. }
  101. // log.Println("SetState::FlushOb2", state)
  102. //有时只有order 没有 account 和 position,用价格来刷新
  103. this.sendEvent()
  104. this.reset()
  105. }
  106. return nil
  107. }
  108. func (this *OneExecutionSM) reset() {
  109. this.current = OneExecutionEvent{}
  110. this.status = oneExecutionInit
  111. }
  112. func (this *OneExecutionSM) sendEvent() {
  113. tmp := this.current
  114. if this.handle.OnOneExecution != nil {
  115. this.handle.OnOneExecution(this.handle.EventSession, &tmp)
  116. }
  117. }
  118. func (this *EventHandler) HandleEventData(bodyData string, isinit bool) {
  119. init := InitEvent{}
  120. init.AccountDetail = this.EventSession.GetAccountDetails()
  121. if isinit {
  122. //tracelog.Println("init EventHandler:", bodyData)
  123. this.oneExecution.reset()
  124. }
  125. var eventData, name string
  126. if strings.Index(bodyData, "<events>") == 0 {
  127. bodyData = util.ParseXmlNameData(bodyData, []string{"events", "body"})
  128. }
  129. for {
  130. name, eventData, bodyData = util.ParseXmlNode(bodyData)
  131. if name == "" {
  132. break
  133. }
  134. switch name {
  135. case "events":
  136. this.HandleEventData(util.ParseXmlNameData(eventData, []string{"events", "body"}), isinit)
  137. case "ob2":
  138. event := this.HandleOrderBookEvent(eventData)
  139. if !isinit {
  140. this.oneExecution.SetState(event)
  141. }
  142. if isinit && event != nil {
  143. init.OrderBook = append(init.OrderBook, event)
  144. }
  145. case "exchangeRate":
  146. event := this.HandleExchangeRateEvent(eventData)
  147. if isinit && event != nil {
  148. init.ExchangeRate = append(init.ExchangeRate, event)
  149. }
  150. case "instructionRejected":
  151. this.HandleRejectedEvent(eventData)
  152. case "accountState":
  153. event := this.HandleAccountStateEvent(eventData)
  154. if !isinit {
  155. err := this.oneExecution.SetState(event)
  156. if err != nil {
  157. //tracelog.Println("SetState", err)
  158. }
  159. }
  160. if isinit && event != nil {
  161. init.Account = event
  162. }
  163. case "order":
  164. events := this.HandleOrderEvent(eventData)
  165. if !isinit {
  166. for i := 0; i < len(events); i++ {
  167. err := this.oneExecution.SetState(events[i])
  168. if err != nil {
  169. //tracelog.Println("SetState:", err)
  170. }
  171. }
  172. }
  173. if isinit && events != nil {
  174. init.Order = append(init.Order, events...)
  175. }
  176. case "orderBookStatus":
  177. this.HandleOrderBookStatusEvent(eventData)
  178. case "heartbeat":
  179. this.HandleHeartbeatEvent(eventData)
  180. case "position":
  181. events := this.HandlePositionEvent(eventData)
  182. if !isinit {
  183. for i := 0; i < len(events); i++ {
  184. err := this.oneExecution.SetState(events[i])
  185. if err != nil {
  186. //tracelog.Println("SetState:", err)
  187. }
  188. }
  189. }
  190. if isinit && events != nil {
  191. init.Position = append(init.Position, events...)
  192. }
  193. case "historicMarketData":
  194. this.HandleHistoricEvent(eventData)
  195. case "orders":
  196. data := util.ParseXmlNameData(eventData, []string{name, "page"})
  197. events := this.HandleOrderEvent(data)
  198. if isinit && events != nil {
  199. init.Order = append(init.Order, events...)
  200. }
  201. case "positions":
  202. data := util.ParseXmlNameData(eventData, []string{name, "page"})
  203. events := this.HandlePositionEvent(data)
  204. if isinit && events != nil {
  205. init.Position = append(init.Position, events...)
  206. }
  207. default:
  208. //tracelog.Println("default EventHandler:", name, eventData)
  209. }
  210. //tracelog.Println(name, "end")
  211. }
  212. if isinit {
  213. //tracelog.Println("init beg")
  214. this.HandleInitEvent(&init)
  215. //tracelog.Println("init end")
  216. }
  217. }
  218. func (this *EventHandler) HandleEventError(err error) {
  219. if strings.ToLower(err.(*OpError).Op) == "stream" {
  220. if this.OnStreamFailure != nil {
  221. this.OnStreamFailure(this.EventSession, err)
  222. }
  223. }
  224. }
  225. func (this *EventHandler) HandleInitEvent(event *InitEvent) {
  226. if this.OnInit != nil {
  227. this.OnInit(this.EventSession, event)
  228. }
  229. }
  230. func (this *EventHandler) HandleEventSessionDisconnected() {
  231. if this.OnSessionDisconnected != nil {
  232. this.OnSessionDisconnected(this.EventSession)
  233. }
  234. }
  235. func (this *EventHandler) HandleExchangeRateEvent(data string) *response.ExchangeRateEvent {
  236. event := response.NewExchangeRateEvent(data)
  237. if event != nil && this.OnExchangeRateEvent != nil {
  238. this.OnExchangeRateEvent(this.EventSession, event)
  239. }
  240. return event
  241. }
  242. func (this *EventHandler) HandleAccountStateEvent(data string) *response.AccountStateEvent {
  243. event := response.NewAccountStateEvent(data)
  244. if event != nil && this.OnAccountStateEvent != nil {
  245. this.OnAccountStateEvent(this.EventSession, event)
  246. }
  247. return event
  248. }
  249. func (this *EventHandler) HandleHeartbeatEvent(data string) {
  250. event := response.NewHeartbeatEvent(data)
  251. if event != nil && this.OnHeartbeatEvent != nil {
  252. this.OnHeartbeatEvent(this.EventSession, event.AccountId, event.Token)
  253. }
  254. }
  255. func (this *EventHandler) HandleOrderBookEvent(data string) *response.OrderBookEvent {
  256. if bodyData := util.ParseXmlNameData(data, []string{"ob2"}); bodyData != "" {
  257. event := response.NewOrderBookEvent(bodyData)
  258. if this.OnOrderBookEvent != nil {
  259. this.OnOrderBookEvent(this.EventSession, event)
  260. }
  261. return event
  262. }
  263. return nil
  264. }
  265. func (this *EventHandler) HandleOrderBookStatusEvent(data string) {
  266. event := response.NewOrderBookStatusEvent(data)
  267. if event != nil && this.OnOrderBookStatusEvent != nil {
  268. this.OnOrderBookStatusEvent(this.EventSession, event)
  269. }
  270. }
  271. func (this *EventHandler) HandleOrderEvent(data string) (result []*response.OrderEvent) {
  272. var name, eventData string
  273. for {
  274. name, eventData, data = util.ParseXmlNode(data)
  275. if name == "" {
  276. break
  277. }
  278. if name == "order" {
  279. event, execution := response.NewOrderEvent(eventData)
  280. result = append(result, event)
  281. if execution != nil && this.OnExecutionEvent != nil && event.IsExecution() {
  282. this.OnExecutionEvent(this.EventSession, execution)
  283. }
  284. if event != nil && this.OnOrderEvent != nil {
  285. this.OnOrderEvent(this.EventSession, event)
  286. }
  287. }
  288. }
  289. return
  290. }
  291. func (this *EventHandler) HandlePositionEvent(data string) (result []*response.PositionEvent) {
  292. var name, eventData string
  293. for {
  294. name, eventData, data = util.ParseXmlNode(data)
  295. if name == "" {
  296. break
  297. }
  298. if name == "position" {
  299. event := response.NewPositionEvent(eventData)
  300. result = append(result, event)
  301. if event != nil && this.OnPositionEvent != nil {
  302. this.OnPositionEvent(this.EventSession, event)
  303. }
  304. }
  305. }
  306. return
  307. }
  308. func (this *EventHandler) HandleRejectedEvent(data string) {
  309. event := response.NewInstructionRejectedEvent(data)
  310. if event != nil && this.OnInstructionRejected != nil {
  311. this.OnInstructionRejected(this.EventSession, event)
  312. }
  313. }
  314. func (this *EventHandler) HandleHistoricEvent(data string) {
  315. event := response.NewHistoricMarketData(data)
  316. if event != nil && this.OnHistoricEvent != nil {
  317. this.OnHistoricEvent(this.EventSession, event)
  318. }
  319. }
  320. func NewEventHandler(session *Session) *EventHandler {
  321. eventHandler := EventHandler{}
  322. eventHandler.EventSession = session
  323. eventHandler.oneExecution = &OneExecutionSM{}
  324. eventHandler.oneExecution.status = oneExecutionInit
  325. eventHandler.oneExecution.handle = &eventHandler
  326. return &eventHandler
  327. }