package lmaxapi //管理整体的一个结构 //接受事件,更新 //接受事件,fetch import "tickserver/api/lmaxapi/response" import "errors" import "log" import "crypto/rand" import "encoding/hex" import "sync" func getId() string { var b [5]byte rand.Read(b[:]) return hex.EncodeToString(b[:]) } var ErrNoAction = errors.New("ErrNoAction") type Mtf struct { ob2 map[int64]*response.OrderBookEvent obstatus map[int64]*response.OrderBookStatusEvent instrument map[int64]*response.Instrument erate map[string]*response.ExchangeRateEvent accounts map[int64]*Account accountIds map[int64]int64 clients map[int64]*MtfClient chrecv chan *Message server *ChanServer msgAction []func(mtf *Mtf, msg *Message) //正在处理中的 pendding map[string]*Message accountIdToClient map[int64]*MtfClient mu sync.Mutex } const ( AdminClient = iota LogClient TradeClient MatchClient ) func NewMtf() *Mtf { mtf := &Mtf{} mtf.ob2 = make(map[int64]*response.OrderBookEvent) mtf.instrument = make(map[int64]*response.Instrument) mtf.erate = make(map[string]*response.ExchangeRateEvent) mtf.accounts = make(map[int64]*Account) mtf.accountIds = make(map[int64]int64) mtf.obstatus = make(map[int64]*response.OrderBookStatusEvent) mtf.clients = make(map[int64]*MtfClient) mtf.accountIdToClient = make(map[int64]*MtfClient) mtf.server = NewChanServer() mtf.chrecv = make(chan *Message, 10240) mtf.msgAction = make([]func(mtf *Mtf, msg *Message), MsgCount) //注册事件回调函数 mtf.bindPrivateAction() mtf.bindPublicAction() mtf.getAccountAction() mtf.getExchangeRateAction() mtf.getOb2Action() mtf.getAllOb2Action() mtf.getInstumentAction() mtf.getInstrumentsAction() mtf.getOrdersAction() mtf.getPositionsAction() mtf.getPositionAction() mtf.getInstrumentAction() mtf.cloneAccountAction() mtf.setInitAction() mtf.setOneExecutionAction() mtf.setTickAction() mtf.setInstrumentAction() mtf.setExchangeRateAction() mtf.setAccountAction() mtf.setPositionAction() mtf.setOrderAction() mtf.setExecutionAction() mtf.setRejectedAction() mtf.setObStatusAction() mtf.echoAction() mtf.closeAction() mtf.setAccountDetailAction() return mtf } //为了方便用户通过clientId 查询client ,这里加了一个lock func (mtf *Mtf) Bind(ty int, id int64, client *MtfClient) { mtf.mu.Lock() defer mtf.mu.Unlock() mtf.clients[id] = client mtf.server.Bind(ty, id, client) } func (mtf *Mtf) AddAccount(account *Account, clientId int64) { mtf.mu.Lock() defer mtf.mu.Unlock() accountId := account.GetId() //三个映射表:可以根据account id 查询 client //可以根据client id 查询 account mtf.accountIds[clientId] = accountId mtf.accountIdToClient[accountId] = mtf.clients[clientId] mtf.accounts[accountId] = account } func (mtf *Mtf) DeleteAccount(clientId int64) { mtf.mu.Lock() defer mtf.mu.Unlock() accountId, ok := mtf.accountIds[clientId] if ok { delete(mtf.accounts, accountId) delete(mtf.accountIdToClient, accountId) delete(mtf.accountIds, clientId) } } func (mtf *Mtf) GetAccount(clientId int64) *Account { mtf.mu.Lock() defer mtf.mu.Unlock() id, ok := mtf.accountIds[clientId] if ok { return mtf.accounts[id] } return nil } func (mtf *Mtf) GetAccountById(accountId int64) *Account { mtf.mu.Lock() defer mtf.mu.Unlock() account, ok := mtf.accounts[accountId] if ok { return account } return nil } var ErrClientNotFound = errors.New("ErrClientNotFound") //可以被多线程查询 func (mtf *Mtf) GetClient(clientId int64) (*MtfClient, error) { mtf.mu.Lock() defer mtf.mu.Unlock() if client, ok := mtf.clients[clientId]; ok { return client, nil } return nil, ErrClientNotFound } func (mtf *Mtf) GetClientByAccountId(accountId int64) (*MtfClient, error) { mtf.mu.Lock() defer mtf.mu.Unlock() if client, ok := mtf.accountIdToClient[accountId]; ok { return client, nil } return nil, ErrClientNotFound } func (mtf *Mtf) setTick(event *TickEvent, accountId int64) (ret *AccountUpdated) { if len(event.ob2.AskPrices) == 0 || len(event.ob2.BidPrices) == 0 { return nil } mtf.ob2[event.ob2.InstrumentId] = event.ob2 for _, account := range mtf.accounts { if account.GetId() == accountId { ret = account.UpdateTick(event) continue } account.UpdateTick(event) } return } func (mtf *Mtf) setInitAction() { mtf.RegisterAction(MsgInit, func(mtf *Mtf, msg *Message) { data := msg.Data.(*InitEvent) account := mtf.GetAccount(msg.ClientId) if account == nil { account = NewAccount(mtf, nil, nil, msg.ClientId) // PrintStruct(data) // PrintStruct(data.Account) account.SetState(data.Account) account.SetDetail(data.AccountDetail) mtf.AddAccount(account, msg.ClientId) } else { account.SetState(data.Account) account.SetDetail(data.AccountDetail) } //初始化 result := account.Init(data) if result == nil { msg.Err = errors.New("err init") return } result.Req = msg.Data msg.Data = result }) } //MsgSetOneExecution func (mtf *Mtf) setOneExecutionAction() { mtf.RegisterAction(MsgSetOneExecution, func(mtf *Mtf, msg *Message) { data := msg.Data.(*OneExecutionEvent) account := mtf.GetAccount(msg.ClientId) if account == nil { msg.Err = errors.New("err get account MsgSetOneExecution") return } //初始化 result := account.SetOneExecution(data) if result == nil { msg.Err = errors.New("err update account MsgSetOneExecution") return } result.Req = msg.Data msg.Data = result mtf.server.SendPrivate(msg) }) } func (mtf *Mtf) SendPrivate(msg *Message) error { return mtf.server.SendPrivate(msg) } type Instructioner interface { GetId() string SetId(string) Clone() interface{} SetAccountId(int64) } func (mtf *Mtf) setTickAction() { mtf.RegisterAction(MsgSetTick, func(mtf *Mtf, msg *Message) { account := mtf.GetAccount(msg.ClientId) data := msg.Data.(*TickEvent) msg.Flag |= Public //特殊情况,没有account的时候,不需要报告account的情况 result := &AccountUpdated{} if account == nil { mtf.setTick(data, 0) if len(data.ob2.AskPrices) == 0 || len(data.ob2.BidPrices) == 0 { msg.Err = errors.New("err tick") } } else { result = mtf.setTick(data, account.GetId()) if result == nil { msg.Err = errors.New("err tick") } } result.Req = msg.Data msg.Data = result mtf.server.SendPublic(msg) }) } //这个函数会产生等待,所以一般不会使用,mtf 要使用异步版本 func (mtf *Mtf) SendMessage(clientId int64, msgType int, data interface{}) *Message { //准备 msg := NewMessage(msgType, clientId, data, 0) msg.Ch = make(chan *Message, 1) mtf.Send(msg) //接受 recvmsg := <-msg.Ch return recvmsg } func (mtf *Mtf) SendMessageAsyn(clientId int64, msgType int, data interface{}) *Message { msg := NewMessage(msgType, clientId, data, 0) msg.Flag |= SendAsyn //发送 mtf.Send(msg) //接受 return msg } func (mtf *Mtf) setInstrument(event *response.Instrument) { mtf.instrument[event.Id] = event } func (mtf *Mtf) setObStatus(event *response.OrderBookStatusEvent) { mtf.obstatus[event.InstrumentId] = event } func (mtf *Mtf) setInstrumentAction() { mtf.RegisterAction(MsgSetInstrument, func(mtf *Mtf, msg *Message) { data := msg.Data.(*response.Instrument) mtf.setInstrument(data) msg.Flag |= Public mtf.server.SendPublic(msg) }) } func (mtf *Mtf) setObStatusAction() { mtf.RegisterAction(MsgSetObStatus, func(mtf *Mtf, msg *Message) { data := msg.Data.(*response.OrderBookStatusEvent) mtf.setObStatus(data) msg.Flag |= Public mtf.server.SendPublic(msg) }) } func (mtf *Mtf) bindPrivateAction() { mtf.RegisterAction(MsgMtfBindPrivate, func(mtf *Mtf, msg *Message) { data := msg.Data.(*MtfClient) mtf.Bind(Private, msg.ClientId, data) }) } func (mtf *Mtf) bindPublicAction() { mtf.RegisterAction(MsgMtfBindPublic, func(mtf *Mtf, msg *Message) { data := msg.Data.(*MtfClient) mtf.Bind(Public, msg.ClientId, data) }) } func (mtf *Mtf) setExchangeRate(event *RateEvent, accountId int64) (ret *AccountUpdated) { mtf.erate[event.erate.From+"/"+event.erate.To] = event.erate for _, account := range mtf.accounts { if account.GetId() == accountId { ret = account.UpdateRate(event) continue } account.UpdateRate(event) } return } func (mtf *Mtf) setExchangeRateAction() { mtf.RegisterAction(MsgSetExchangeRate, func(mtf *Mtf, msg *Message) { data := msg.Data.(*RateEvent) account := mtf.GetAccount(msg.ClientId) msg.Flag |= Public result := &AccountUpdated{} if account == nil { mtf.setExchangeRate(data, 0) } else { result = mtf.setExchangeRate(data, account.GetId()) if result == nil { msg.Err = errors.New("err erate") } } result.Req = msg.Data msg.Data = result mtf.server.SendPublic(msg) }) } func (mtf *Mtf) getAllRate() []*response.ExchangeRateEvent { var rates []*response.ExchangeRateEvent for _, rate := range mtf.erate { rates = append(rates, rate) } return rates } func (mtf *Mtf) getExchangeRate(symbol string) *response.ExchangeRateEvent { if rate, ok := mtf.erate[symbol+"/"+"USD"]; ok { return rate } return nil } func (mtf *Mtf) getExchangeRateAction() { mtf.RegisterAction(MsgGetExchangeRate, func(mtf *Mtf, msg *Message) { data := msg.Data.(string) rate := mtf.getExchangeRate(data) msg.Data = rate }) } func (mtf *Mtf) getOrdersAction() { mtf.RegisterAction(MsgGetOrders, func(mtf *Mtf, msg *Message) { iscopy := msg.Data.(bool) account := mtf.GetAccount(msg.ClientId) if account == nil { msg.Err = errors.New("getOrdersAction::accout not init") return } data := account.orderList.GetOrders(iscopy) msg.Data = data }) } func (mtf *Mtf) getPositionsAction() { mtf.RegisterAction(MsgGetPositions, func(mtf *Mtf, msg *Message) { iscopy := msg.Data.(bool) account := mtf.GetAccount(msg.ClientId) if account == nil { msg.Err = errors.New("getPositionsAction::accout not init") return } data := account.positionList.GetPositions(iscopy) msg.Data = data }) } func (mtf *Mtf) getPositionAction() { mtf.RegisterAction(MsgGetPosition, func(mtf *Mtf, msg *Message) { id := msg.Data.(int64) account := mtf.GetAccount(msg.ClientId) if account == nil { msg.Err = errors.New("getPositionAction::accout not init") return } data := account.positionList.GetPosition(id) msg.Data = data }) } func (mtf *Mtf) echoAction() { mtf.RegisterAction(MsgEcho, func(mtf *Mtf, msg *Message) { }) } func replyMessage(msg *Message) { select { case msg.Ch <- msg: //tracelog.Println("reply", msg.Id) default: log.Println("reply msg error", msg) } } func (mtf *Mtf) setRejectedAction() { mtf.RegisterAction(MsgSetRejected, func(mtf *Mtf, msg *Message) { data := msg.Data.(*response.InstructionRejectedEvent) account := mtf.GetAccount(msg.ClientId) if account == nil { msg.Err = errors.New("setRejectedAction::accout not init") return } account.SetRejected(data) mtf.server.SendPrivate(msg) }) } func (mtf *Mtf) closeAction() { mtf.RegisterAction(MsgClose, func(mtf *Mtf, msg *Message) { mtf.server.UnBind(Private, msg.ClientId) mtf.server.UnBind(Public, msg.ClientId) }) } func (mtf *Mtf) setAccountAction() { mtf.RegisterAction(MsgSetAccount, func(mtf *Mtf, msg *Message) { data := msg.Data.(*response.AccountStateEvent) account := mtf.GetAccount(msg.ClientId) //单件模式,只可能被设置一次。 if account == nil { log.Println("setAccountAction", msg.ClientId, account, data) account = NewAccount(mtf, nil, nil, msg.ClientId) account.SetState(data) mtf.AddAccount(account, msg.ClientId) } }) } func (mtf *Mtf) setAccountDetailAction() { mtf.RegisterAction(MsgSetAccountDetails, func(mtf *Mtf, msg *Message) { data := msg.Data.(*response.AccountDetails) account := mtf.GetAccount(msg.ClientId) //单件模式,只可能被设置一次。 if account == nil { msg.Err = errors.New("setAccountDetailAction::accout not init") return } account.SetDetail(data) }) } func (mtf *Mtf) setPositionAction() { mtf.RegisterAction(MsgSetPosition, func(mtf *Mtf, msg *Message) { data := msg.Data.(*response.PositionEvent) account := mtf.GetAccount(msg.ClientId) if account == nil { msg.Err = errors.New("setPositionAction::accout not init") return } account.SetPosition(data) }) } func (mtf *Mtf) setOrderAction() { mtf.RegisterAction(MsgSetOrder, func(mtf *Mtf, msg *Message) { data := msg.Data.(*response.OrderEvent) account := mtf.GetAccount(msg.ClientId) if account == nil { msg.Err = errors.New("setOrderAction::accout not init") return } account.SetOrder(data) }) } //MsgSetExecution func (mtf *Mtf) setExecutionAction() { mtf.RegisterAction(MsgSetExecution, func(mtf *Mtf, msg *Message) { data := msg.Data.(*response.ExecutionEvent) account := mtf.GetAccount(msg.ClientId) if account == nil { msg.Err = errors.New("setExecutionAction::accout not init") return } account.SetExecution(data) }) } func (mtf *Mtf) getInstrumentInfo(inst int64) (instrument *response.Instrument, erate *response.ExchangeRateEvent, ob2 *response.OrderBookEvent, err error) { var ok bool instrument, ok = mtf.instrument[inst] if !ok { err = errors.New("order InstrumentId error.") return } if instrument.Currency != "USD" { erate, ok = mtf.erate[instrument.Currency+"/"+"USD"] if !ok { err = errors.New("exchangeRate not found.") return } } ob2, ok = mtf.ob2[inst] if !ok { err = errors.New("lastOrderBookEvent error.") return } return instrument, erate, ob2, nil } func (mtf *Mtf) getOb2(id int64) *response.OrderBookEvent { return mtf.ob2[id] } func (mtf *Mtf) getOb2Action() { mtf.RegisterAction(MsgGetTick, func(mtf *Mtf, msg *Message) { data := msg.Data.(int64) ob2 := mtf.getOb2(data) msg.Data = ob2 }) } func (mtf *Mtf) getInstrumentAction() { mtf.RegisterAction(MsgGetInstrument, func(mtf *Mtf, msg *Message) { data := msg.Data.(int64) inst := mtf.instrument[data] msg.Data = inst }) } func (mtf *Mtf) getInstrumentsAction() { mtf.RegisterAction(MsgGetInstruments, func(mtf *Mtf, msg *Message) { iscopy := msg.Data.(bool) insts := mtf.getAllInstrument(iscopy) msg.Data = insts }) } func (mtf *Mtf) getAllOb2() (ret []*response.OrderBookEvent) { for _, price := range mtf.ob2 { ret = append(ret, price) } return ret } func (mtf *Mtf) getAllOb2Action() { mtf.RegisterAction(MsgGetAllTick, func(mtf *Mtf, msg *Message) { ob2 := mtf.getAllOb2() msg.Data = ob2 }) } func (mtf *Mtf) getInstument(id int64) *response.Instrument { data, ok := mtf.instrument[id] if ok { return data } return nil } func (mtf *Mtf) cloneAccountAction() { mtf.RegisterAction(MsgCloneAccount, func(mtf *Mtf, msg *Message) { isCopyInsts := msg.Data.(bool) info := &AccountInfo{} account := mtf.GetAccount(msg.ClientId) if account == nil { msg.Err = errors.New("cloneAccountAction::accout not init") return } if isCopyInsts { info.ob2 = mtf.getAllOb2() info.erate = mtf.getAllRate() } orders := &response.Orders{} orders.Data = account.orderList.GetOrders(true) orders.HasMoreResults = false positions := &response.Positions{} positions.Data = account.positionList.GetPositions(true) positions.HasMoreResults = false info.accountState = account.GetState() info.orders = orders info.positions = positions msg.Data = info }) } func (mtf *Mtf) getAccountAction() { mtf.RegisterAction(MsgGetAccount, func(mtf *Mtf, msg *Message) { account := mtf.GetAccount(msg.ClientId) if account == nil { msg.Err = errors.New("getAccountAction::accout not init") return } msg.Data = account.GetState() }) } func (mtf *Mtf) getInstumentAction() { mtf.RegisterAction(MsgGetInstument, func(mtf *Mtf, msg *Message) { data := msg.Data.(int64) ret := mtf.getInstument(data) msg.Data = ret }) } func (mtf *Mtf) getAllInstrument(iscopy bool) (ret []*response.Instrument) { for _, v := range mtf.instrument { if iscopy { tmp := *v ret = append(ret, &tmp) } ret = append(ret, v) } return ret } //Start 中的所有函数必须是非阻塞的 func (mtf *Mtf) Start() { //log.Println("mtf is ready") for { msg, ok := <-mtf.chrecv if !ok || msg == nil { break } //关闭mtf if msg.Type == MsgShutDown { mtf.Close() replyMessage(msg) break } action := mtf.msgAction[msg.Type] if action == nil { msg.Err = ErrNoAction replyMessage(msg) //向客户端回复 // log.Println("[MTF_ACTION_ERROR]", msg) continue } if msg.Type != MsgSetTick && msg.Type != MsgSetExchangeRate && msg.Type != MsgMtfBindPublic && msg.Type != MsgMtfBindPrivate { // log.Println("[mtf_action_ok]", msg) } action(mtf, msg) if msg.Err != nil { // log.Println(msg.Err, msg.Data) } replyMessage(msg) //向客户端回复 } // log.Println("mtf is closed") } func (mtf *Mtf) Close() error { mtf.chrecv <- nil close(mtf.chrecv) return nil } func (mtf *Mtf) Send(msg *Message) { if debug { msg.SendTime = getTime() } mtf.chrecv <- msg } func (mtf *Mtf) RegisterAction(ty int, cb func(mtf *Mtf, msg *Message)) { mtf.msgAction[ty] = cb }