order_book.go 11 KB


  1. package lmaxapi
  2. import "tickserver/api/lmaxapi/request"
  3. import "tickserver/api/lmaxapi/response"
  4. import "sync"
  5. import "reflect"
  6. import "time"
  7. import "log"
  8. import "errors"
  9. //会发生一个线程写,一个线程读的情况,暂时先全部加锁
  10. type OrderBook struct {
  11. mu sync.Mutex
  12. orderevent func(string, *response.OrderEvent)
  13. positionevent func(string, *response.PositionEvent)
  14. orderbookstatusevent func(string, *response.OrderBookStatusEvent)
  15. rejectedevent func(string, *response.InstructionRejectedEvent)
  16. rejectedevents map[string]func(string, *response.InstructionRejectedEvent)
  17. accountevent func(string, *response.AccountStateEvent)
  18. orderbookevent func(*response.OrderBookEvent)
  19. events map[string][]func(string, *response.OrderEvent)
  20. orderIdToInstId map[string]string
  21. mtf *MtfClient
  22. }
  23. //在账户登出又重新登录的情况下,我们需要做特殊的处理
  24. //价格还是简单的被重写
  25. //accountStatus 也可以简单的被set
  26. //但是order 和 position 要进行下面的操作:
  27. //增加一个init事件,这个事件是用户的一个快照。<seq>0</seq>的事件
  28. func NewOrderBook(mtf *Mtf) *OrderBook {
  29. //log.Println("NewOrderBook")
  30. ob := &OrderBook{}
  31. ob.events = make(map[string][]func(string, *response.OrderEvent))
  32. ob.orderIdToInstId = make(map[string]string)
  33. ob.rejectedevents = make(map[string]func(string, *response.InstructionRejectedEvent))
  34. ob.mtf = NewMtfClient(mtf, time.Second, false, 1024)
  35. go ob.Start()
  36. err := ob.mtf.ConnectPrivate()
  37. if err != nil {
  38. log.Println(err)
  39. return nil
  40. }
  41. err = ob.mtf.ConnectPublic()
  42. if err != nil {
  43. log.Println(err)
  44. return nil
  45. }
  46. return ob
  47. }
  48. func (this *OrderBook) Stop() {
  49. this.mtf.Close()
  50. this.mtf.GetMtf().Close()
  51. }
  52. //如果在多线程环境下,或者要修改,那么要采用copy模式
  53. func (this *OrderBook) GetOrders(iscopy bool) []*response.OrderEvent {
  54. return this.GetOrdersUnsafe(iscopy)
  55. }
  56. func (this *OrderBook) GetOrdersUnsafe(iscopy bool) []*response.OrderEvent {
  57. return this.mtf.GetOrders(iscopy)
  58. }
  59. //对某个订单添加监听事件
  60. func (this *OrderBook) AddEvent(instructId string, cb func(status string, event *response.OrderEvent)) {
  61. this.mu.Lock()
  62. defer this.mu.Unlock()
  63. //查找相关的事件 instrunctId
  64. events, id := this.findEvent(instructId)
  65. orders := this.GetOrders(true)
  66. if id == "" { //如果没有找到,那么添加事件
  67. id = instructId
  68. for i := 0; i < len(orders); i++ {
  69. if orders[i].InstructionId == instructId {
  70. this.orderIdToInstId[orders[i].OrderId] = instructId
  71. break
  72. }
  73. }
  74. }
  75. for i := 0; i < len(events); i++ {
  76. if reflect.ValueOf(cb).Pointer() == reflect.ValueOf(events[i]).Pointer() {
  77. return
  78. }
  79. }
  80. this.events[id] = append(events, cb)
  81. }
  82. func (this *OrderBook) AddRejectEvent(instructId string, cb func(status string, event *response.InstructionRejectedEvent)) {
  83. this.mu.Lock()
  84. defer this.mu.Unlock()
  85. this.rejectedevents[instructId] = cb
  86. }
  87. func (this *OrderBook) SetOrderBookEvent(cb func(event *response.OrderBookEvent)) {
  88. this.mu.Lock()
  89. defer this.mu.Unlock()
  90. this.orderbookevent = cb
  91. }
  92. func (this *OrderBook) ClearEvent(instrunctId string, cb func(status string, event *response.OrderEvent)) {
  93. this.mu.Lock()
  94. defer this.mu.Unlock()
  95. events, id := this.findEvent(instrunctId)
  96. if id == "" { //如果没有找到,那么无法删除事件
  97. return
  98. }
  99. for i := 0; i < len(events); i++ {
  100. if reflect.ValueOf(cb).Pointer() == reflect.ValueOf(events[i]).Pointer() {
  101. events = append(events[:i], events[i+1:]...)
  102. break
  103. }
  104. }
  105. this.events[id] = events
  106. }
  107. func (this *OrderBook) SetOrderEvent(cb func(status string, event *response.OrderEvent)) {
  108. this.mu.Lock()
  109. defer this.mu.Unlock()
  110. this.orderevent = cb
  111. }
  112. func (this *OrderBook) SetRejectedEvent(cb func(status string, event *response.InstructionRejectedEvent)) {
  113. this.mu.Lock()
  114. defer this.mu.Unlock()
  115. this.rejectedevent = cb
  116. }
  117. func (this *OrderBook) SetOrderBookStatusEvent(cb func(status string, event *response.OrderBookStatusEvent)) {
  118. this.mu.Lock()
  119. defer this.mu.Unlock()
  120. this.orderbookstatusevent = cb
  121. }
  122. func (this *OrderBook) SetPositionEvent(cb func(status string, event *response.PositionEvent)) {
  123. this.mu.Lock()
  124. defer this.mu.Unlock()
  125. this.positionevent = cb
  126. }
  127. func (this *OrderBook) SetAccountEvent(cb func(status string, event *response.AccountStateEvent)) {
  128. this.mu.Lock()
  129. defer this.mu.Unlock()
  130. this.accountevent = cb
  131. }
  132. func (this *OrderBook) SetRejected(event *response.InstructionRejectedEvent) {
  133. this.mtf.SetRejected(event)
  134. if this.rejectedevent != nil {
  135. this.rejectedevent("SystemUpdate", event)
  136. }
  137. if cb, ok := this.rejectedevents[event.InstructionId]; ok {
  138. cb("SystemUpdate", event)
  139. delete(this.rejectedevents, event.InstructionId)
  140. }
  141. }
  142. func (this *OrderBook) SetObStatus(event *response.OrderBookStatusEvent) {
  143. this.mtf.SetObStatus(event)
  144. if this.orderbookstatusevent != nil {
  145. this.orderbookstatusevent("SystemUpdate", event)
  146. }
  147. }
  148. func (this *OrderBook) Init(event *InitEvent) {
  149. updated := this.mtf.Init(event)
  150. this.eventDispatch("SystemUpdate", updated)
  151. }
  152. //执行作为整体来更新
  153. func (this *OrderBook) SetOneExecution(event *OneExecutionEvent) {
  154. updated := this.mtf.SetOneExecution(event)
  155. this.eventDispatch("SystemUpdate", updated)
  156. }
  157. func (this *OrderBook) SetPosition(event *response.PositionEvent) {
  158. this.mtf.SetPosition(event)
  159. if this.positionevent != nil {
  160. this.positionevent("SystemUpdate", event)
  161. }
  162. }
  163. func (this *OrderBook) SetAccount(account *response.AccountStateEvent) {
  164. this.mtf.SetAccount(account)
  165. if this.accountevent != nil {
  166. this.accountevent("SystemUpdate", account)
  167. }
  168. }
  169. func (this *OrderBook) SetOrder(event *response.OrderEvent) {
  170. this.mtf.SetOrder(event)
  171. this.executeEvent("SystemUpdate", event)
  172. }
  173. //SetExecution
  174. func (this *OrderBook) SetExecution(event *response.ExecutionEvent) {
  175. this.mtf.SetExecution(event)
  176. //this.executeEvent("SystemExecution", event)
  177. }
  178. //更新tick, 这个操作是最频繁的
  179. func (this *OrderBook) UpdateTick(event *response.OrderBookEvent) {
  180. req := &TickEvent{}
  181. req.ob2 = event
  182. req.isCopy = true
  183. req.fetchUpdated = true
  184. updated := this.mtf.UpdateTick(req)
  185. //log.Println(updated)
  186. this.eventDispatch("TickUpdate", updated)
  187. }
  188. //货币对列表
  189. //暂时不做更新
  190. func (this *OrderBook) SetInstrument(event *response.Instrument) {
  191. err := this.mtf.SetInstrument(event)
  192. if err != nil {
  193. log.Println(err)
  194. return
  195. }
  196. }
  197. //暂时不做更新
  198. func (this *OrderBook) SetExchangeRate(event *response.ExchangeRateEvent) {
  199. req := &RateEvent{}
  200. req.erate = event
  201. req.isCopy = true
  202. req.fetchUpdated = true
  203. updated := this.mtf.SetExchangeRate(req)
  204. this.eventDispatch("ExchangeRate", updated)
  205. }
  206. func (this *OrderBook) executeEvent(status string, event *response.OrderEvent) {
  207. id := event.InstructionId
  208. orderId := event.OrderId
  209. if this.orderevent != nil {
  210. this.orderevent(status, event)
  211. }
  212. events, newid := this.findEvent(id)
  213. if newid == "" {
  214. if instId, ok := this.orderIdToInstId[orderId]; ok {
  215. events, newid = this.findEvent(instId)
  216. } else {
  217. return
  218. }
  219. } else {
  220. this.orderIdToInstId[orderId] = newid
  221. }
  222. for i := 0; i < len(events); i++ {
  223. cb := events[i]
  224. cb(status, event)
  225. }
  226. }
  227. func (this *OrderBook) eventDispatch(status string, updated *AccountUpdated) {
  228. if updated == nil {
  229. log.Println("err eventDispatch")
  230. return
  231. }
  232. for i := 0; i < len(updated.orders); i++ {
  233. this.executeEvent(status, updated.orders[i])
  234. }
  235. if this.positionevent != nil {
  236. for i := 0; i < len(updated.positions); i++ {
  237. this.positionevent(status, updated.positions[i])
  238. }
  239. }
  240. if this.accountevent != nil && updated.account != nil {
  241. this.accountevent(status, updated.account)
  242. }
  243. }
  244. func (this *OrderBook) findEvent(id string) ([]func(string, *response.OrderEvent), string) {
  245. if events, ok := this.events[id]; ok {
  246. return events, id
  247. }
  248. return nil, ""
  249. }
  250. func (this *OrderBook) GetAccount() *response.AccountStateEvent {
  251. return this.mtf.GetAccount()
  252. }
  253. func (this *OrderBook) GetOrderBookEvent(id int64) *response.OrderBookEvent {
  254. return this.mtf.GetOb2(id)
  255. }
  256. func (this *OrderBook) GetPosition(id int64) *response.PositionEvent {
  257. return this.mtf.GetPosition(id)
  258. }
  259. func (this *OrderBook) GetPositions(iscopy bool) []*response.PositionEvent {
  260. return this.mtf.GetPositions(iscopy)
  261. }
  262. func (this *OrderBook) GetInstrument(id int64) *response.Instrument {
  263. return this.mtf.GetInstrument(id)
  264. }
  265. func (this *OrderBook) GetInstruments(iscopy bool) []*response.Instrument {
  266. return this.mtf.GetInstruments(iscopy)
  267. }
  268. //从mtf中接受各种各样的信息
  269. func (this *OrderBook) Start() {
  270. // log.Println("go start()")
  271. this.mtf.Recv(func(msg *Message) {
  272. this.dispatch(msg)
  273. })
  274. // log.Println("order book start end")
  275. }
  276. func (this *OrderBook) dispatch(msg *Message) {
  277. this.mu.Lock()
  278. defer this.mu.Unlock()
  279. //do some thing
  280. if msg.Err != nil {
  281. log.Println("msg error:", msg)
  282. return
  283. }
  284. if msg.Type == MsgSetTick && this.orderbookevent != nil {
  285. ob2 := msg.Data.(*AccountUpdated).Req.(*TickEvent).ob2
  286. this.orderbookevent(ob2)
  287. return
  288. }
  289. if msg.Type == MsgSetExchangeRate || msg.Type == MsgSetInstrument {
  290. //内部信息,不对外发布
  291. return
  292. }
  293. // log.Println("order book msg", msg)
  294. }
  295. func (this *OrderBook) GetOffsetByPoint(inst int64, point float64) (float64, error) {
  296. instrument := this.mtf.GetInstrument(inst)
  297. if instrument == nil {
  298. return 0, errors.New("OrderBook InstrumentId error.")
  299. }
  300. return instrument.PriceIncrement * point * 10, nil
  301. }
  302. //order book 交易辅助函数
  303. func PositionTrackingInit(session *Session) error {
  304. //根据lmax的变化规则,一次执行总是以 order position accout 的顺序给出
  305. //所以我合并上述四个监听为一个,这样保证整个执行能完成的更新到mtf。
  306. //这是对思路方法的一个改进:初始化-->执行-->执行,更新着用户账户的变化
  307. //上面三个是交易相关
  308. session.RegisterInitEvent(func(session *Session, event *InitEvent) {
  309. session.GetOrderBook().Init(event)
  310. })
  311. session.RegisterOneExecutionEvent(func(s *Session, event *OneExecutionEvent) {
  312. PrintStruct(event)
  313. session.GetOrderBook().SetOneExecution(event)
  314. })
  315. session.RegisterInstructionRejectedEvent(func(session *Session, event *response.InstructionRejectedEvent) {
  316. session.GetOrderBook().SetRejected(event)
  317. })
  318. //下面三个是行情相关
  319. session.RegisterOrderBookEvent(func(s *Session, event *response.OrderBookEvent) {
  320. if len(event.AskPrices) == 0 || len(event.BidPrices) == 0 {
  321. log.Println("err tick", event)
  322. return
  323. }
  324. session.GetOrderBook().UpdateTick(event)
  325. })
  326. session.RegisterOrderBookStatusEvent(func(s *Session, event *response.OrderBookStatusEvent) {
  327. session.GetOrderBook().SetObStatus(event)
  328. })
  329. session.RegisterExchangeRateEvent(func(session *Session, event *response.ExchangeRateEvent) {
  330. session.GetOrderBook().SetExchangeRate(event)
  331. })
  332. return nil
  333. }
  334. func PositionTrackingSub(session *Session) error {
  335. subs := NewMultiSubscribe()
  336. session.LoadAllInstruments(func(value *response.Instrument) {
  337. session.GetOrderBook().SetInstrument(value)
  338. subs.Add(request.NewOrderBookSubscriptionRequest(value.Id))
  339. if value.Currency != "USD" {
  340. subs.Add(request.NewExchangeRateRequest(value.Currency, "USD"))
  341. }
  342. subs.Add(request.NewInstrumentRequest(value.Id))
  343. subs.Add(request.NewOrderBookStatusRequest(value.Id))
  344. })
  345. subs.Add(request.NewPositionSubscriptionRequest())
  346. subs.Add(request.NewOrderSubscriptionRequest())
  347. subs.Add(request.NewAccountSubscriptionRequest())
  348. return session.Subscribe(subs, nil)
  349. }