mtf.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691
  1. package lmaxapi
  2. //管理整体的一个结构
  3. //接受事件,更新
  4. //接受事件,fetch
  5. import "tickserver/api/lmaxapi/response"
  6. import "errors"
  7. import "log"
  8. import "crypto/rand"
  9. import "encoding/hex"
  10. import "sync"
  11. func getId() string {
  12. var b [5]byte
  13. rand.Read(b[:])
  14. return hex.EncodeToString(b[:])
  15. }
  16. var ErrNoAction = errors.New("ErrNoAction")
  17. type Mtf struct {
  18. ob2 map[int64]*response.OrderBookEvent
  19. obstatus map[int64]*response.OrderBookStatusEvent
  20. instrument map[int64]*response.Instrument
  21. erate map[string]*response.ExchangeRateEvent
  22. accounts map[int64]*Account
  23. accountIds map[int64]int64
  24. clients map[int64]*MtfClient
  25. chrecv chan *Message
  26. server *ChanServer
  27. msgAction []func(mtf *Mtf, msg *Message)
  28. //正在处理中的
  29. pendding map[string]*Message
  30. accountIdToClient map[int64]*MtfClient
  31. mu sync.Mutex
  32. }
  33. const (
  34. AdminClient = iota
  35. LogClient
  36. TradeClient
  37. MatchClient
  38. )
  39. func NewMtf() *Mtf {
  40. mtf := &Mtf{}
  41. mtf.ob2 = make(map[int64]*response.OrderBookEvent)
  42. mtf.instrument = make(map[int64]*response.Instrument)
  43. mtf.erate = make(map[string]*response.ExchangeRateEvent)
  44. mtf.accounts = make(map[int64]*Account)
  45. mtf.accountIds = make(map[int64]int64)
  46. mtf.obstatus = make(map[int64]*response.OrderBookStatusEvent)
  47. mtf.clients = make(map[int64]*MtfClient)
  48. mtf.accountIdToClient = make(map[int64]*MtfClient)
  49. mtf.server = NewChanServer()
  50. mtf.chrecv = make(chan *Message, 10240)
  51. mtf.msgAction = make([]func(mtf *Mtf, msg *Message), MsgCount)
  52. //注册事件回调函数
  53. mtf.bindPrivateAction()
  54. mtf.bindPublicAction()
  55. mtf.getAccountAction()
  56. mtf.getExchangeRateAction()
  57. mtf.getOb2Action()
  58. mtf.getAllOb2Action()
  59. mtf.getInstumentAction()
  60. mtf.getInstrumentsAction()
  61. mtf.getOrdersAction()
  62. mtf.getPositionsAction()
  63. mtf.getPositionAction()
  64. mtf.getInstrumentAction()
  65. mtf.cloneAccountAction()
  66. mtf.setInitAction()
  67. mtf.setOneExecutionAction()
  68. mtf.setTickAction()
  69. mtf.setInstrumentAction()
  70. mtf.setExchangeRateAction()
  71. mtf.setAccountAction()
  72. mtf.setPositionAction()
  73. mtf.setOrderAction()
  74. mtf.setExecutionAction()
  75. mtf.setRejectedAction()
  76. mtf.setObStatusAction()
  77. mtf.echoAction()
  78. mtf.closeAction()
  79. mtf.setAccountDetailAction()
  80. return mtf
  81. }
  82. //为了方便用户通过clientId 查询client ,这里加了一个lock
  83. func (mtf *Mtf) Bind(ty int, id int64, client *MtfClient) {
  84. mtf.mu.Lock()
  85. defer mtf.mu.Unlock()
  86. mtf.clients[id] = client
  87. mtf.server.Bind(ty, id, client)
  88. }
  89. func (mtf *Mtf) AddAccount(account *Account, clientId int64) {
  90. mtf.mu.Lock()
  91. defer mtf.mu.Unlock()
  92. accountId := account.GetId()
  93. //三个映射表:可以根据account id 查询 client
  94. //可以根据client id 查询 account
  95. mtf.accountIds[clientId] = accountId
  96. mtf.accountIdToClient[accountId] = mtf.clients[clientId]
  97. mtf.accounts[accountId] = account
  98. }
  99. func (mtf *Mtf) DeleteAccount(clientId int64) {
  100. mtf.mu.Lock()
  101. defer mtf.mu.Unlock()
  102. accountId, ok := mtf.accountIds[clientId]
  103. if ok {
  104. delete(mtf.accounts, accountId)
  105. delete(mtf.accountIdToClient, accountId)
  106. delete(mtf.accountIds, clientId)
  107. }
  108. }
  109. func (mtf *Mtf) GetAccount(clientId int64) *Account {
  110. mtf.mu.Lock()
  111. defer mtf.mu.Unlock()
  112. id, ok := mtf.accountIds[clientId]
  113. if ok {
  114. return mtf.accounts[id]
  115. }
  116. return nil
  117. }
  118. func (mtf *Mtf) GetAccountById(accountId int64) *Account {
  119. mtf.mu.Lock()
  120. defer mtf.mu.Unlock()
  121. account, ok := mtf.accounts[accountId]
  122. if ok {
  123. return account
  124. }
  125. return nil
  126. }
  127. var ErrClientNotFound = errors.New("ErrClientNotFound")
  128. //可以被多线程查询
  129. func (mtf *Mtf) GetClient(clientId int64) (*MtfClient, error) {
  130. mtf.mu.Lock()
  131. defer mtf.mu.Unlock()
  132. if client, ok := mtf.clients[clientId]; ok {
  133. return client, nil
  134. }
  135. return nil, ErrClientNotFound
  136. }
  137. func (mtf *Mtf) GetClientByAccountId(accountId int64) (*MtfClient, error) {
  138. mtf.mu.Lock()
  139. defer mtf.mu.Unlock()
  140. if client, ok := mtf.accountIdToClient[accountId]; ok {
  141. return client, nil
  142. }
  143. return nil, ErrClientNotFound
  144. }
  145. func (mtf *Mtf) setTick(event *TickEvent, accountId int64) (ret *AccountUpdated) {
  146. if len(event.ob2.AskPrices) == 0 || len(event.ob2.BidPrices) == 0 {
  147. return nil
  148. }
  149. mtf.ob2[event.ob2.InstrumentId] = event.ob2
  150. for _, account := range mtf.accounts {
  151. if account.GetId() == accountId {
  152. ret = account.UpdateTick(event)
  153. continue
  154. }
  155. account.UpdateTick(event)
  156. }
  157. return
  158. }
  159. func (mtf *Mtf) setInitAction() {
  160. mtf.RegisterAction(MsgInit, func(mtf *Mtf, msg *Message) {
  161. data := msg.Data.(*InitEvent)
  162. account := mtf.GetAccount(msg.ClientId)
  163. if account == nil {
  164. account = NewAccount(mtf, nil, nil, msg.ClientId)
  165. // PrintStruct(data)
  166. // PrintStruct(data.Account)
  167. account.SetState(data.Account)
  168. account.SetDetail(data.AccountDetail)
  169. mtf.AddAccount(account, msg.ClientId)
  170. } else {
  171. account.SetState(data.Account)
  172. account.SetDetail(data.AccountDetail)
  173. }
  174. //初始化
  175. result := account.Init(data)
  176. if result == nil {
  177. msg.Err = errors.New("err init")
  178. return
  179. }
  180. result.Req = msg.Data
  181. msg.Data = result
  182. })
  183. }
  184. //MsgSetOneExecution
  185. func (mtf *Mtf) setOneExecutionAction() {
  186. mtf.RegisterAction(MsgSetOneExecution, func(mtf *Mtf, msg *Message) {
  187. data := msg.Data.(*OneExecutionEvent)
  188. account := mtf.GetAccount(msg.ClientId)
  189. if account == nil {
  190. msg.Err = errors.New("err get account MsgSetOneExecution")
  191. return
  192. }
  193. //初始化
  194. result := account.SetOneExecution(data)
  195. if result == nil {
  196. msg.Err = errors.New("err update account MsgSetOneExecution")
  197. return
  198. }
  199. result.Req = msg.Data
  200. msg.Data = result
  201. mtf.server.SendPrivate(msg)
  202. })
  203. }
  204. func (mtf *Mtf) SendPrivate(msg *Message) error {
  205. return mtf.server.SendPrivate(msg)
  206. }
  207. type Instructioner interface {
  208. GetId() string
  209. SetId(string)
  210. Clone() interface{}
  211. SetAccountId(int64)
  212. }
  213. func (mtf *Mtf) setTickAction() {
  214. mtf.RegisterAction(MsgSetTick, func(mtf *Mtf, msg *Message) {
  215. account := mtf.GetAccount(msg.ClientId)
  216. data := msg.Data.(*TickEvent)
  217. msg.Flag |= Public
  218. //特殊情况,没有account的时候,不需要报告account的情况
  219. result := &AccountUpdated{}
  220. if account == nil {
  221. mtf.setTick(data, 0)
  222. if len(data.ob2.AskPrices) == 0 || len(data.ob2.BidPrices) == 0 {
  223. msg.Err = errors.New("err tick")
  224. }
  225. } else {
  226. result = mtf.setTick(data, account.GetId())
  227. if result == nil {
  228. msg.Err = errors.New("err tick")
  229. }
  230. }
  231. result.Req = msg.Data
  232. msg.Data = result
  233. mtf.server.SendPublic(msg)
  234. })
  235. }
  236. //这个函数会产生等待,所以一般不会使用,mtf 要使用异步版本
  237. func (mtf *Mtf) SendMessage(clientId int64, msgType int, data interface{}) *Message {
  238. //准备
  239. msg := NewMessage(msgType, clientId, data, 0)
  240. msg.Ch = make(chan *Message, 1)
  241. mtf.Send(msg)
  242. //接受
  243. recvmsg := <-msg.Ch
  244. return recvmsg
  245. }
  246. func (mtf *Mtf) SendMessageAsyn(clientId int64, msgType int, data interface{}) *Message {
  247. msg := NewMessage(msgType, clientId, data, 0)
  248. msg.Flag |= SendAsyn
  249. //发送
  250. mtf.Send(msg)
  251. //接受
  252. return msg
  253. }
  254. func (mtf *Mtf) setInstrument(event *response.Instrument) {
  255. mtf.instrument[event.Id] = event
  256. }
  257. func (mtf *Mtf) setObStatus(event *response.OrderBookStatusEvent) {
  258. mtf.obstatus[event.InstrumentId] = event
  259. }
  260. func (mtf *Mtf) setInstrumentAction() {
  261. mtf.RegisterAction(MsgSetInstrument, func(mtf *Mtf, msg *Message) {
  262. data := msg.Data.(*response.Instrument)
  263. mtf.setInstrument(data)
  264. msg.Flag |= Public
  265. mtf.server.SendPublic(msg)
  266. })
  267. }
  268. func (mtf *Mtf) setObStatusAction() {
  269. mtf.RegisterAction(MsgSetObStatus, func(mtf *Mtf, msg *Message) {
  270. data := msg.Data.(*response.OrderBookStatusEvent)
  271. mtf.setObStatus(data)
  272. msg.Flag |= Public
  273. mtf.server.SendPublic(msg)
  274. })
  275. }
  276. func (mtf *Mtf) bindPrivateAction() {
  277. mtf.RegisterAction(MsgMtfBindPrivate, func(mtf *Mtf, msg *Message) {
  278. data := msg.Data.(*MtfClient)
  279. mtf.Bind(Private, msg.ClientId, data)
  280. })
  281. }
  282. func (mtf *Mtf) bindPublicAction() {
  283. mtf.RegisterAction(MsgMtfBindPublic, func(mtf *Mtf, msg *Message) {
  284. data := msg.Data.(*MtfClient)
  285. mtf.Bind(Public, msg.ClientId, data)
  286. })
  287. }
  288. func (mtf *Mtf) setExchangeRate(event *RateEvent, accountId int64) (ret *AccountUpdated) {
  289. mtf.erate[event.erate.From+"/"+event.erate.To] = event.erate
  290. for _, account := range mtf.accounts {
  291. if account.GetId() == accountId {
  292. ret = account.UpdateRate(event)
  293. continue
  294. }
  295. account.UpdateRate(event)
  296. }
  297. return
  298. }
  299. func (mtf *Mtf) setExchangeRateAction() {
  300. mtf.RegisterAction(MsgSetExchangeRate, func(mtf *Mtf, msg *Message) {
  301. data := msg.Data.(*RateEvent)
  302. account := mtf.GetAccount(msg.ClientId)
  303. msg.Flag |= Public
  304. result := &AccountUpdated{}
  305. if account == nil {
  306. mtf.setExchangeRate(data, 0)
  307. } else {
  308. result = mtf.setExchangeRate(data, account.GetId())
  309. if result == nil {
  310. msg.Err = errors.New("err erate")
  311. }
  312. }
  313. result.Req = msg.Data
  314. msg.Data = result
  315. mtf.server.SendPublic(msg)
  316. })
  317. }
  318. func (mtf *Mtf) getAllRate() []*response.ExchangeRateEvent {
  319. var rates []*response.ExchangeRateEvent
  320. for _, rate := range mtf.erate {
  321. rates = append(rates, rate)
  322. }
  323. return rates
  324. }
  325. func (mtf *Mtf) getExchangeRate(symbol string) *response.ExchangeRateEvent {
  326. if rate, ok := mtf.erate[symbol+"/"+"USD"]; ok {
  327. return rate
  328. }
  329. return nil
  330. }
  331. func (mtf *Mtf) getExchangeRateAction() {
  332. mtf.RegisterAction(MsgGetExchangeRate, func(mtf *Mtf, msg *Message) {
  333. data := msg.Data.(string)
  334. rate := mtf.getExchangeRate(data)
  335. msg.Data = rate
  336. })
  337. }
  338. func (mtf *Mtf) getOrdersAction() {
  339. mtf.RegisterAction(MsgGetOrders, func(mtf *Mtf, msg *Message) {
  340. iscopy := msg.Data.(bool)
  341. account := mtf.GetAccount(msg.ClientId)
  342. if account == nil {
  343. msg.Err = errors.New("getOrdersAction::accout not init")
  344. return
  345. }
  346. data := account.orderList.GetOrders(iscopy)
  347. msg.Data = data
  348. })
  349. }
  350. func (mtf *Mtf) getPositionsAction() {
  351. mtf.RegisterAction(MsgGetPositions, func(mtf *Mtf, msg *Message) {
  352. iscopy := msg.Data.(bool)
  353. account := mtf.GetAccount(msg.ClientId)
  354. if account == nil {
  355. msg.Err = errors.New("getPositionsAction::accout not init")
  356. return
  357. }
  358. data := account.positionList.GetPositions(iscopy)
  359. msg.Data = data
  360. })
  361. }
  362. func (mtf *Mtf) getPositionAction() {
  363. mtf.RegisterAction(MsgGetPosition, func(mtf *Mtf, msg *Message) {
  364. id := msg.Data.(int64)
  365. account := mtf.GetAccount(msg.ClientId)
  366. if account == nil {
  367. msg.Err = errors.New("getPositionAction::accout not init")
  368. return
  369. }
  370. data := account.positionList.GetPosition(id)
  371. msg.Data = data
  372. })
  373. }
  374. func (mtf *Mtf) echoAction() {
  375. mtf.RegisterAction(MsgEcho, func(mtf *Mtf, msg *Message) {
  376. })
  377. }
  378. func replyMessage(msg *Message) {
  379. select {
  380. case msg.Ch <- msg:
  381. //tracelog.Println("reply", msg.Id)
  382. default:
  383. log.Println("reply msg error", msg)
  384. }
  385. }
  386. func (mtf *Mtf) setRejectedAction() {
  387. mtf.RegisterAction(MsgSetRejected, func(mtf *Mtf, msg *Message) {
  388. data := msg.Data.(*response.InstructionRejectedEvent)
  389. account := mtf.GetAccount(msg.ClientId)
  390. if account == nil {
  391. msg.Err = errors.New("setRejectedAction::accout not init")
  392. return
  393. }
  394. account.SetRejected(data)
  395. mtf.server.SendPrivate(msg)
  396. })
  397. }
  398. func (mtf *Mtf) closeAction() {
  399. mtf.RegisterAction(MsgClose, func(mtf *Mtf, msg *Message) {
  400. mtf.server.UnBind(Private, msg.ClientId)
  401. mtf.server.UnBind(Public, msg.ClientId)
  402. })
  403. }
  404. func (mtf *Mtf) setAccountAction() {
  405. mtf.RegisterAction(MsgSetAccount, func(mtf *Mtf, msg *Message) {
  406. data := msg.Data.(*response.AccountStateEvent)
  407. account := mtf.GetAccount(msg.ClientId)
  408. //单件模式,只可能被设置一次。
  409. if account == nil {
  410. log.Println("setAccountAction", msg.ClientId, account, data)
  411. account = NewAccount(mtf, nil, nil, msg.ClientId)
  412. account.SetState(data)
  413. mtf.AddAccount(account, msg.ClientId)
  414. }
  415. })
  416. }
  417. func (mtf *Mtf) setAccountDetailAction() {
  418. mtf.RegisterAction(MsgSetAccountDetails, func(mtf *Mtf, msg *Message) {
  419. data := msg.Data.(*response.AccountDetails)
  420. account := mtf.GetAccount(msg.ClientId)
  421. //单件模式,只可能被设置一次。
  422. if account == nil {
  423. msg.Err = errors.New("setAccountDetailAction::accout not init")
  424. return
  425. }
  426. account.SetDetail(data)
  427. })
  428. }
  429. func (mtf *Mtf) setPositionAction() {
  430. mtf.RegisterAction(MsgSetPosition, func(mtf *Mtf, msg *Message) {
  431. data := msg.Data.(*response.PositionEvent)
  432. account := mtf.GetAccount(msg.ClientId)
  433. if account == nil {
  434. msg.Err = errors.New("setPositionAction::accout not init")
  435. return
  436. }
  437. account.SetPosition(data)
  438. })
  439. }
  440. func (mtf *Mtf) setOrderAction() {
  441. mtf.RegisterAction(MsgSetOrder, func(mtf *Mtf, msg *Message) {
  442. data := msg.Data.(*response.OrderEvent)
  443. account := mtf.GetAccount(msg.ClientId)
  444. if account == nil {
  445. msg.Err = errors.New("setOrderAction::accout not init")
  446. return
  447. }
  448. account.SetOrder(data)
  449. })
  450. }
  451. //MsgSetExecution
  452. func (mtf *Mtf) setExecutionAction() {
  453. mtf.RegisterAction(MsgSetExecution, func(mtf *Mtf, msg *Message) {
  454. data := msg.Data.(*response.ExecutionEvent)
  455. account := mtf.GetAccount(msg.ClientId)
  456. if account == nil {
  457. msg.Err = errors.New("setExecutionAction::accout not init")
  458. return
  459. }
  460. account.SetExecution(data)
  461. })
  462. }
  463. func (mtf *Mtf) getInstrumentInfo(inst int64) (instrument *response.Instrument, erate *response.ExchangeRateEvent,
  464. ob2 *response.OrderBookEvent, err error) {
  465. var ok bool
  466. instrument, ok = mtf.instrument[inst]
  467. if !ok {
  468. err = errors.New("order InstrumentId error.")
  469. return
  470. }
  471. if instrument.Currency != "USD" {
  472. erate, ok = mtf.erate[instrument.Currency+"/"+"USD"]
  473. if !ok {
  474. err = errors.New("exchangeRate not found.")
  475. return
  476. }
  477. }
  478. ob2, ok = mtf.ob2[inst]
  479. if !ok {
  480. err = errors.New("lastOrderBookEvent error.")
  481. return
  482. }
  483. return instrument, erate, ob2, nil
  484. }
  485. func (mtf *Mtf) getOb2(id int64) *response.OrderBookEvent {
  486. return mtf.ob2[id]
  487. }
  488. func (mtf *Mtf) getOb2Action() {
  489. mtf.RegisterAction(MsgGetTick, func(mtf *Mtf, msg *Message) {
  490. data := msg.Data.(int64)
  491. ob2 := mtf.getOb2(data)
  492. msg.Data = ob2
  493. })
  494. }
  495. func (mtf *Mtf) getInstrumentAction() {
  496. mtf.RegisterAction(MsgGetInstrument, func(mtf *Mtf, msg *Message) {
  497. data := msg.Data.(int64)
  498. inst := mtf.instrument[data]
  499. msg.Data = inst
  500. })
  501. }
  502. func (mtf *Mtf) getInstrumentsAction() {
  503. mtf.RegisterAction(MsgGetInstruments, func(mtf *Mtf, msg *Message) {
  504. iscopy := msg.Data.(bool)
  505. insts := mtf.getAllInstrument(iscopy)
  506. msg.Data = insts
  507. })
  508. }
  509. func (mtf *Mtf) getAllOb2() (ret []*response.OrderBookEvent) {
  510. for _, price := range mtf.ob2 {
  511. ret = append(ret, price)
  512. }
  513. return ret
  514. }
  515. func (mtf *Mtf) getAllOb2Action() {
  516. mtf.RegisterAction(MsgGetAllTick, func(mtf *Mtf, msg *Message) {
  517. ob2 := mtf.getAllOb2()
  518. msg.Data = ob2
  519. })
  520. }
  521. func (mtf *Mtf) getInstument(id int64) *response.Instrument {
  522. data, ok := mtf.instrument[id]
  523. if ok {
  524. return data
  525. }
  526. return nil
  527. }
  528. func (mtf *Mtf) cloneAccountAction() {
  529. mtf.RegisterAction(MsgCloneAccount, func(mtf *Mtf, msg *Message) {
  530. isCopyInsts := msg.Data.(bool)
  531. info := &AccountInfo{}
  532. account := mtf.GetAccount(msg.ClientId)
  533. if account == nil {
  534. msg.Err = errors.New("cloneAccountAction::accout not init")
  535. return
  536. }
  537. if isCopyInsts {
  538. info.ob2 = mtf.getAllOb2()
  539. info.erate = mtf.getAllRate()
  540. }
  541. orders := &response.Orders{}
  542. orders.Data = account.orderList.GetOrders(true)
  543. orders.HasMoreResults = false
  544. positions := &response.Positions{}
  545. positions.Data = account.positionList.GetPositions(true)
  546. positions.HasMoreResults = false
  547. info.accountState = account.GetState()
  548. info.orders = orders
  549. info.positions = positions
  550. msg.Data = info
  551. })
  552. }
  553. func (mtf *Mtf) getAccountAction() {
  554. mtf.RegisterAction(MsgGetAccount, func(mtf *Mtf, msg *Message) {
  555. account := mtf.GetAccount(msg.ClientId)
  556. if account == nil {
  557. msg.Err = errors.New("getAccountAction::accout not init")
  558. return
  559. }
  560. msg.Data = account.GetState()
  561. })
  562. }
  563. func (mtf *Mtf) getInstumentAction() {
  564. mtf.RegisterAction(MsgGetInstument, func(mtf *Mtf, msg *Message) {
  565. data := msg.Data.(int64)
  566. ret := mtf.getInstument(data)
  567. msg.Data = ret
  568. })
  569. }
  570. func (mtf *Mtf) getAllInstrument(iscopy bool) (ret []*response.Instrument) {
  571. for _, v := range mtf.instrument {
  572. if iscopy {
  573. tmp := *v
  574. ret = append(ret, &tmp)
  575. }
  576. ret = append(ret, v)
  577. }
  578. return ret
  579. }
  580. //Start 中的所有函数必须是非阻塞的
  581. func (mtf *Mtf) Start() {
  582. //log.Println("mtf is ready")
  583. for {
  584. msg, ok := <-mtf.chrecv
  585. if !ok || msg == nil {
  586. break
  587. }
  588. //关闭mtf
  589. if msg.Type == MsgShutDown {
  590. mtf.Close()
  591. replyMessage(msg)
  592. break
  593. }
  594. action := mtf.msgAction[msg.Type]
  595. if action == nil {
  596. msg.Err = ErrNoAction
  597. replyMessage(msg) //向客户端回复
  598. // log.Println("[MTF_ACTION_ERROR]", msg)
  599. continue
  600. }
  601. if msg.Type != MsgSetTick && msg.Type != MsgSetExchangeRate && msg.Type != MsgMtfBindPublic && msg.Type != MsgMtfBindPrivate {
  602. // log.Println("[mtf_action_ok]", msg)
  603. }
  604. action(mtf, msg)
  605. if msg.Err != nil {
  606. // log.Println(msg.Err, msg.Data)
  607. }
  608. replyMessage(msg) //向客户端回复
  609. }
  610. // log.Println("mtf is closed")
  611. }
  612. func (mtf *Mtf) Close() error {
  613. mtf.chrecv <- nil
  614. close(mtf.chrecv)
  615. return nil
  616. }
  617. func (mtf *Mtf) Send(msg *Message) {
  618. if debug {
  619. msg.SendTime = getTime()
  620. }
  621. mtf.chrecv <- msg
  622. }
  623. func (mtf *Mtf) RegisterAction(ty int, cb func(mtf *Mtf, msg *Message)) {
  624. mtf.msgAction[ty] = cb
  625. }