123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691 |
- package lmaxapi
- //管理整体的一个结构
- //接受事件,更新
- //接受事件,fetch
- import "tickserver/api/lmaxapi/response"
- import "errors"
- import "log"
- import "crypto/rand"
- import "encoding/hex"
- import "sync"
- func getId() string {
- var b [5]byte
- rand.Read(b[:])
- return hex.EncodeToString(b[:])
- }
- var ErrNoAction = errors.New("ErrNoAction")
- type Mtf struct {
- ob2 map[int64]*response.OrderBookEvent
- obstatus map[int64]*response.OrderBookStatusEvent
- instrument map[int64]*response.Instrument
- erate map[string]*response.ExchangeRateEvent
- accounts map[int64]*Account
- accountIds map[int64]int64
- clients map[int64]*MtfClient
- chrecv chan *Message
- server *ChanServer
- msgAction []func(mtf *Mtf, msg *Message)
- //正在处理中的
- pendding map[string]*Message
- accountIdToClient map[int64]*MtfClient
- mu sync.Mutex
- }
- const (
- AdminClient = iota
- LogClient
- TradeClient
- MatchClient
- )
- func NewMtf() *Mtf {
- mtf := &Mtf{}
- mtf.ob2 = make(map[int64]*response.OrderBookEvent)
- mtf.instrument = make(map[int64]*response.Instrument)
- mtf.erate = make(map[string]*response.ExchangeRateEvent)
- mtf.accounts = make(map[int64]*Account)
- mtf.accountIds = make(map[int64]int64)
- mtf.obstatus = make(map[int64]*response.OrderBookStatusEvent)
- mtf.clients = make(map[int64]*MtfClient)
- mtf.accountIdToClient = make(map[int64]*MtfClient)
- mtf.server = NewChanServer()
- mtf.chrecv = make(chan *Message, 10240)
- mtf.msgAction = make([]func(mtf *Mtf, msg *Message), MsgCount)
- //注册事件回调函数
- mtf.bindPrivateAction()
- mtf.bindPublicAction()
- mtf.getAccountAction()
- mtf.getExchangeRateAction()
- mtf.getOb2Action()
- mtf.getAllOb2Action()
- mtf.getInstumentAction()
- mtf.getInstrumentsAction()
- mtf.getOrdersAction()
- mtf.getPositionsAction()
- mtf.getPositionAction()
- mtf.getInstrumentAction()
- mtf.cloneAccountAction()
- mtf.setInitAction()
- mtf.setOneExecutionAction()
- mtf.setTickAction()
- mtf.setInstrumentAction()
- mtf.setExchangeRateAction()
- mtf.setAccountAction()
- mtf.setPositionAction()
- mtf.setOrderAction()
- mtf.setExecutionAction()
- mtf.setRejectedAction()
- mtf.setObStatusAction()
- mtf.echoAction()
- mtf.closeAction()
- mtf.setAccountDetailAction()
- return mtf
- }
- //为了方便用户通过clientId 查询client ,这里加了一个lock
- func (mtf *Mtf) Bind(ty int, id int64, client *MtfClient) {
- mtf.mu.Lock()
- defer mtf.mu.Unlock()
- mtf.clients[id] = client
- mtf.server.Bind(ty, id, client)
- }
- func (mtf *Mtf) AddAccount(account *Account, clientId int64) {
- mtf.mu.Lock()
- defer mtf.mu.Unlock()
- accountId := account.GetId()
- //三个映射表:可以根据account id 查询 client
- //可以根据client id 查询 account
- mtf.accountIds[clientId] = accountId
- mtf.accountIdToClient[accountId] = mtf.clients[clientId]
- mtf.accounts[accountId] = account
- }
- func (mtf *Mtf) DeleteAccount(clientId int64) {
- mtf.mu.Lock()
- defer mtf.mu.Unlock()
- accountId, ok := mtf.accountIds[clientId]
- if ok {
- delete(mtf.accounts, accountId)
- delete(mtf.accountIdToClient, accountId)
- delete(mtf.accountIds, clientId)
- }
- }
- func (mtf *Mtf) GetAccount(clientId int64) *Account {
- mtf.mu.Lock()
- defer mtf.mu.Unlock()
- id, ok := mtf.accountIds[clientId]
- if ok {
- return mtf.accounts[id]
- }
- return nil
- }
- func (mtf *Mtf) GetAccountById(accountId int64) *Account {
- mtf.mu.Lock()
- defer mtf.mu.Unlock()
- account, ok := mtf.accounts[accountId]
- if ok {
- return account
- }
- return nil
- }
- var ErrClientNotFound = errors.New("ErrClientNotFound")
- //可以被多线程查询
- func (mtf *Mtf) GetClient(clientId int64) (*MtfClient, error) {
- mtf.mu.Lock()
- defer mtf.mu.Unlock()
- if client, ok := mtf.clients[clientId]; ok {
- return client, nil
- }
- return nil, ErrClientNotFound
- }
- func (mtf *Mtf) GetClientByAccountId(accountId int64) (*MtfClient, error) {
- mtf.mu.Lock()
- defer mtf.mu.Unlock()
- if client, ok := mtf.accountIdToClient[accountId]; ok {
- return client, nil
- }
- return nil, ErrClientNotFound
- }
- func (mtf *Mtf) setTick(event *TickEvent, accountId int64) (ret *AccountUpdated) {
- if len(event.ob2.AskPrices) == 0 || len(event.ob2.BidPrices) == 0 {
- return nil
- }
- mtf.ob2[event.ob2.InstrumentId] = event.ob2
- for _, account := range mtf.accounts {
- if account.GetId() == accountId {
- ret = account.UpdateTick(event)
- continue
- }
- account.UpdateTick(event)
- }
- return
- }
- func (mtf *Mtf) setInitAction() {
- mtf.RegisterAction(MsgInit, func(mtf *Mtf, msg *Message) {
- data := msg.Data.(*InitEvent)
- account := mtf.GetAccount(msg.ClientId)
- if account == nil {
- account = NewAccount(mtf, nil, nil, msg.ClientId)
- // PrintStruct(data)
- // PrintStruct(data.Account)
- account.SetState(data.Account)
- account.SetDetail(data.AccountDetail)
- mtf.AddAccount(account, msg.ClientId)
- } else {
- account.SetState(data.Account)
- account.SetDetail(data.AccountDetail)
- }
- //初始化
- result := account.Init(data)
- if result == nil {
- msg.Err = errors.New("err init")
- return
- }
- result.Req = msg.Data
- msg.Data = result
- })
- }
- //MsgSetOneExecution
- func (mtf *Mtf) setOneExecutionAction() {
- mtf.RegisterAction(MsgSetOneExecution, func(mtf *Mtf, msg *Message) {
- data := msg.Data.(*OneExecutionEvent)
- account := mtf.GetAccount(msg.ClientId)
- if account == nil {
- msg.Err = errors.New("err get account MsgSetOneExecution")
- return
- }
- //初始化
- result := account.SetOneExecution(data)
- if result == nil {
- msg.Err = errors.New("err update account MsgSetOneExecution")
- return
- }
- result.Req = msg.Data
- msg.Data = result
- mtf.server.SendPrivate(msg)
- })
- }
- func (mtf *Mtf) SendPrivate(msg *Message) error {
- return mtf.server.SendPrivate(msg)
- }
- type Instructioner interface {
- GetId() string
- SetId(string)
- Clone() interface{}
- SetAccountId(int64)
- }
- func (mtf *Mtf) setTickAction() {
- mtf.RegisterAction(MsgSetTick, func(mtf *Mtf, msg *Message) {
- account := mtf.GetAccount(msg.ClientId)
- data := msg.Data.(*TickEvent)
- msg.Flag |= Public
- //特殊情况,没有account的时候,不需要报告account的情况
- result := &AccountUpdated{}
- if account == nil {
- mtf.setTick(data, 0)
- if len(data.ob2.AskPrices) == 0 || len(data.ob2.BidPrices) == 0 {
- msg.Err = errors.New("err tick")
- }
- } else {
- result = mtf.setTick(data, account.GetId())
- if result == nil {
- msg.Err = errors.New("err tick")
- }
- }
- result.Req = msg.Data
- msg.Data = result
- mtf.server.SendPublic(msg)
- })
- }
- //这个函数会产生等待,所以一般不会使用,mtf 要使用异步版本
- func (mtf *Mtf) SendMessage(clientId int64, msgType int, data interface{}) *Message {
- //准备
- msg := NewMessage(msgType, clientId, data, 0)
- msg.Ch = make(chan *Message, 1)
- mtf.Send(msg)
- //接受
- recvmsg := <-msg.Ch
- return recvmsg
- }
- func (mtf *Mtf) SendMessageAsyn(clientId int64, msgType int, data interface{}) *Message {
- msg := NewMessage(msgType, clientId, data, 0)
- msg.Flag |= SendAsyn
- //发送
- mtf.Send(msg)
- //接受
- return msg
- }
- func (mtf *Mtf) setInstrument(event *response.Instrument) {
- mtf.instrument[event.Id] = event
- }
- func (mtf *Mtf) setObStatus(event *response.OrderBookStatusEvent) {
- mtf.obstatus[event.InstrumentId] = event
- }
- func (mtf *Mtf) setInstrumentAction() {
- mtf.RegisterAction(MsgSetInstrument, func(mtf *Mtf, msg *Message) {
- data := msg.Data.(*response.Instrument)
- mtf.setInstrument(data)
- msg.Flag |= Public
- mtf.server.SendPublic(msg)
- })
- }
- func (mtf *Mtf) setObStatusAction() {
- mtf.RegisterAction(MsgSetObStatus, func(mtf *Mtf, msg *Message) {
- data := msg.Data.(*response.OrderBookStatusEvent)
- mtf.setObStatus(data)
- msg.Flag |= Public
- mtf.server.SendPublic(msg)
- })
- }
- func (mtf *Mtf) bindPrivateAction() {
- mtf.RegisterAction(MsgMtfBindPrivate, func(mtf *Mtf, msg *Message) {
- data := msg.Data.(*MtfClient)
- mtf.Bind(Private, msg.ClientId, data)
- })
- }
- func (mtf *Mtf) bindPublicAction() {
- mtf.RegisterAction(MsgMtfBindPublic, func(mtf *Mtf, msg *Message) {
- data := msg.Data.(*MtfClient)
- mtf.Bind(Public, msg.ClientId, data)
- })
- }
- func (mtf *Mtf) setExchangeRate(event *RateEvent, accountId int64) (ret *AccountUpdated) {
- mtf.erate[event.erate.From+"/"+event.erate.To] = event.erate
- for _, account := range mtf.accounts {
- if account.GetId() == accountId {
- ret = account.UpdateRate(event)
- continue
- }
- account.UpdateRate(event)
- }
- return
- }
- func (mtf *Mtf) setExchangeRateAction() {
- mtf.RegisterAction(MsgSetExchangeRate, func(mtf *Mtf, msg *Message) {
- data := msg.Data.(*RateEvent)
- account := mtf.GetAccount(msg.ClientId)
- msg.Flag |= Public
- result := &AccountUpdated{}
- if account == nil {
- mtf.setExchangeRate(data, 0)
- } else {
- result = mtf.setExchangeRate(data, account.GetId())
- if result == nil {
- msg.Err = errors.New("err erate")
- }
- }
- result.Req = msg.Data
- msg.Data = result
- mtf.server.SendPublic(msg)
- })
- }
- func (mtf *Mtf) getAllRate() []*response.ExchangeRateEvent {
- var rates []*response.ExchangeRateEvent
- for _, rate := range mtf.erate {
- rates = append(rates, rate)
- }
- return rates
- }
- func (mtf *Mtf) getExchangeRate(symbol string) *response.ExchangeRateEvent {
- if rate, ok := mtf.erate[symbol+"/"+"USD"]; ok {
- return rate
- }
- return nil
- }
- func (mtf *Mtf) getExchangeRateAction() {
- mtf.RegisterAction(MsgGetExchangeRate, func(mtf *Mtf, msg *Message) {
- data := msg.Data.(string)
- rate := mtf.getExchangeRate(data)
- msg.Data = rate
- })
- }
- func (mtf *Mtf) getOrdersAction() {
- mtf.RegisterAction(MsgGetOrders, func(mtf *Mtf, msg *Message) {
- iscopy := msg.Data.(bool)
- account := mtf.GetAccount(msg.ClientId)
- if account == nil {
- msg.Err = errors.New("getOrdersAction::accout not init")
- return
- }
- data := account.orderList.GetOrders(iscopy)
- msg.Data = data
- })
- }
- func (mtf *Mtf) getPositionsAction() {
- mtf.RegisterAction(MsgGetPositions, func(mtf *Mtf, msg *Message) {
- iscopy := msg.Data.(bool)
- account := mtf.GetAccount(msg.ClientId)
- if account == nil {
- msg.Err = errors.New("getPositionsAction::accout not init")
- return
- }
- data := account.positionList.GetPositions(iscopy)
- msg.Data = data
- })
- }
- func (mtf *Mtf) getPositionAction() {
- mtf.RegisterAction(MsgGetPosition, func(mtf *Mtf, msg *Message) {
- id := msg.Data.(int64)
- account := mtf.GetAccount(msg.ClientId)
- if account == nil {
- msg.Err = errors.New("getPositionAction::accout not init")
- return
- }
- data := account.positionList.GetPosition(id)
- msg.Data = data
- })
- }
- func (mtf *Mtf) echoAction() {
- mtf.RegisterAction(MsgEcho, func(mtf *Mtf, msg *Message) {
- })
- }
- func replyMessage(msg *Message) {
- select {
- case msg.Ch <- msg:
- //tracelog.Println("reply", msg.Id)
- default:
- log.Println("reply msg error", msg)
- }
- }
- func (mtf *Mtf) setRejectedAction() {
- mtf.RegisterAction(MsgSetRejected, func(mtf *Mtf, msg *Message) {
- data := msg.Data.(*response.InstructionRejectedEvent)
- account := mtf.GetAccount(msg.ClientId)
- if account == nil {
- msg.Err = errors.New("setRejectedAction::accout not init")
- return
- }
- account.SetRejected(data)
- mtf.server.SendPrivate(msg)
- })
- }
- func (mtf *Mtf) closeAction() {
- mtf.RegisterAction(MsgClose, func(mtf *Mtf, msg *Message) {
- mtf.server.UnBind(Private, msg.ClientId)
- mtf.server.UnBind(Public, msg.ClientId)
- })
- }
- func (mtf *Mtf) setAccountAction() {
- mtf.RegisterAction(MsgSetAccount, func(mtf *Mtf, msg *Message) {
- data := msg.Data.(*response.AccountStateEvent)
- account := mtf.GetAccount(msg.ClientId)
- //单件模式,只可能被设置一次。
- if account == nil {
- log.Println("setAccountAction", msg.ClientId, account, data)
- account = NewAccount(mtf, nil, nil, msg.ClientId)
- account.SetState(data)
- mtf.AddAccount(account, msg.ClientId)
- }
- })
- }
- func (mtf *Mtf) setAccountDetailAction() {
- mtf.RegisterAction(MsgSetAccountDetails, func(mtf *Mtf, msg *Message) {
- data := msg.Data.(*response.AccountDetails)
- account := mtf.GetAccount(msg.ClientId)
- //单件模式,只可能被设置一次。
- if account == nil {
- msg.Err = errors.New("setAccountDetailAction::accout not init")
- return
- }
- account.SetDetail(data)
- })
- }
- func (mtf *Mtf) setPositionAction() {
- mtf.RegisterAction(MsgSetPosition, func(mtf *Mtf, msg *Message) {
- data := msg.Data.(*response.PositionEvent)
- account := mtf.GetAccount(msg.ClientId)
- if account == nil {
- msg.Err = errors.New("setPositionAction::accout not init")
- return
- }
- account.SetPosition(data)
- })
- }
- func (mtf *Mtf) setOrderAction() {
- mtf.RegisterAction(MsgSetOrder, func(mtf *Mtf, msg *Message) {
- data := msg.Data.(*response.OrderEvent)
- account := mtf.GetAccount(msg.ClientId)
- if account == nil {
- msg.Err = errors.New("setOrderAction::accout not init")
- return
- }
- account.SetOrder(data)
- })
- }
- //MsgSetExecution
- func (mtf *Mtf) setExecutionAction() {
- mtf.RegisterAction(MsgSetExecution, func(mtf *Mtf, msg *Message) {
- data := msg.Data.(*response.ExecutionEvent)
- account := mtf.GetAccount(msg.ClientId)
- if account == nil {
- msg.Err = errors.New("setExecutionAction::accout not init")
- return
- }
- account.SetExecution(data)
- })
- }
- func (mtf *Mtf) getInstrumentInfo(inst int64) (instrument *response.Instrument, erate *response.ExchangeRateEvent,
- ob2 *response.OrderBookEvent, err error) {
- var ok bool
- instrument, ok = mtf.instrument[inst]
- if !ok {
- err = errors.New("order InstrumentId error.")
- return
- }
- if instrument.Currency != "USD" {
- erate, ok = mtf.erate[instrument.Currency+"/"+"USD"]
- if !ok {
- err = errors.New("exchangeRate not found.")
- return
- }
- }
- ob2, ok = mtf.ob2[inst]
- if !ok {
- err = errors.New("lastOrderBookEvent error.")
- return
- }
- return instrument, erate, ob2, nil
- }
- func (mtf *Mtf) getOb2(id int64) *response.OrderBookEvent {
- return mtf.ob2[id]
- }
- func (mtf *Mtf) getOb2Action() {
- mtf.RegisterAction(MsgGetTick, func(mtf *Mtf, msg *Message) {
- data := msg.Data.(int64)
- ob2 := mtf.getOb2(data)
- msg.Data = ob2
- })
- }
- func (mtf *Mtf) getInstrumentAction() {
- mtf.RegisterAction(MsgGetInstrument, func(mtf *Mtf, msg *Message) {
- data := msg.Data.(int64)
- inst := mtf.instrument[data]
- msg.Data = inst
- })
- }
- func (mtf *Mtf) getInstrumentsAction() {
- mtf.RegisterAction(MsgGetInstruments, func(mtf *Mtf, msg *Message) {
- iscopy := msg.Data.(bool)
- insts := mtf.getAllInstrument(iscopy)
- msg.Data = insts
- })
- }
- func (mtf *Mtf) getAllOb2() (ret []*response.OrderBookEvent) {
- for _, price := range mtf.ob2 {
- ret = append(ret, price)
- }
- return ret
- }
- func (mtf *Mtf) getAllOb2Action() {
- mtf.RegisterAction(MsgGetAllTick, func(mtf *Mtf, msg *Message) {
- ob2 := mtf.getAllOb2()
- msg.Data = ob2
- })
- }
- func (mtf *Mtf) getInstument(id int64) *response.Instrument {
- data, ok := mtf.instrument[id]
- if ok {
- return data
- }
- return nil
- }
- func (mtf *Mtf) cloneAccountAction() {
- mtf.RegisterAction(MsgCloneAccount, func(mtf *Mtf, msg *Message) {
- isCopyInsts := msg.Data.(bool)
- info := &AccountInfo{}
- account := mtf.GetAccount(msg.ClientId)
- if account == nil {
- msg.Err = errors.New("cloneAccountAction::accout not init")
- return
- }
- if isCopyInsts {
- info.ob2 = mtf.getAllOb2()
- info.erate = mtf.getAllRate()
- }
- orders := &response.Orders{}
- orders.Data = account.orderList.GetOrders(true)
- orders.HasMoreResults = false
- positions := &response.Positions{}
- positions.Data = account.positionList.GetPositions(true)
- positions.HasMoreResults = false
- info.accountState = account.GetState()
- info.orders = orders
- info.positions = positions
- msg.Data = info
- })
- }
- func (mtf *Mtf) getAccountAction() {
- mtf.RegisterAction(MsgGetAccount, func(mtf *Mtf, msg *Message) {
- account := mtf.GetAccount(msg.ClientId)
- if account == nil {
- msg.Err = errors.New("getAccountAction::accout not init")
- return
- }
- msg.Data = account.GetState()
- })
- }
- func (mtf *Mtf) getInstumentAction() {
- mtf.RegisterAction(MsgGetInstument, func(mtf *Mtf, msg *Message) {
- data := msg.Data.(int64)
- ret := mtf.getInstument(data)
- msg.Data = ret
- })
- }
- func (mtf *Mtf) getAllInstrument(iscopy bool) (ret []*response.Instrument) {
- for _, v := range mtf.instrument {
- if iscopy {
- tmp := *v
- ret = append(ret, &tmp)
- }
- ret = append(ret, v)
- }
- return ret
- }
- //Start 中的所有函数必须是非阻塞的
- func (mtf *Mtf) Start() {
- //log.Println("mtf is ready")
- for {
- msg, ok := <-mtf.chrecv
- if !ok || msg == nil {
- break
- }
- //关闭mtf
- if msg.Type == MsgShutDown {
- mtf.Close()
- replyMessage(msg)
- break
- }
- action := mtf.msgAction[msg.Type]
- if action == nil {
- msg.Err = ErrNoAction
- replyMessage(msg) //向客户端回复
- // log.Println("[MTF_ACTION_ERROR]", msg)
- continue
- }
- if msg.Type != MsgSetTick && msg.Type != MsgSetExchangeRate && msg.Type != MsgMtfBindPublic && msg.Type != MsgMtfBindPrivate {
- // log.Println("[mtf_action_ok]", msg)
- }
- action(mtf, msg)
- if msg.Err != nil {
- // log.Println(msg.Err, msg.Data)
- }
- replyMessage(msg) //向客户端回复
- }
- // log.Println("mtf is closed")
- }
- func (mtf *Mtf) Close() error {
- mtf.chrecv <- nil
- close(mtf.chrecv)
- return nil
- }
- func (mtf *Mtf) Send(msg *Message) {
- if debug {
- msg.SendTime = getTime()
- }
- mtf.chrecv <- msg
- }
- func (mtf *Mtf) RegisterAction(ty int, cb func(mtf *Mtf, msg *Message)) {
- mtf.msgAction[ty] = cb
- }
|