package lmaxapi import "unsafe" import "time" import "sync/atomic" import "log" import "tickserver/api/lmaxapi/request" import "tickserver/api/lmaxapi/response" import "errors" var debug = true type MtfClient struct { mtf *Mtf clientId int64 chrecv chan *Message isFullClose bool timeout time.Duration refCount int64 stoptick chan int count int64 } var ErrChanFull = errors.New("chan is full") //fullClose 这个参数很重要,如果设置为 true,那么send 函数完全异步,否则就是有可能 //发生等待的情况。 func NewMtfClient(mtf *Mtf, timeout time.Duration, isFullClose bool, bufferCount int) *MtfClient { client := &MtfClient{} client.mtf = mtf client.chrecv = make(chan *Message, bufferCount) client.isFullClose = isFullClose client.timeout = timeout client.clientId = int64(uintptr(unsafe.Pointer(client))) if debug { client.stoptick = make(chan int, 1) //在调试状态下可以打开,看看是否有timeout go client.countStat() } return client } func (this *MtfClient) countStat() { ticker := time.NewTicker(time.Second) lastcount := int64(0) for { select { case <-ticker.C: if lastcount > 0 { count := atomic.LoadInt64(&this.count) - lastcount tracelog.Println("send count:", count) } lastcount = atomic.LoadInt64(&this.count) case <-this.stoptick: return } } log.Println("stop count") } func (this *MtfClient) IncRef() int64 { return atomic.AddInt64(&this.refCount, 1) } func (this *MtfClient) DecRef() int64 { return atomic.AddInt64(&this.refCount, -1) } func (this *MtfClient) GetRef() int64 { return atomic.LoadInt64(&this.refCount) } func (this *MtfClient) GetMtf() *Mtf { return this.mtf } func (this *MtfClient) SetMtf(mtf *Mtf) { this.mtf = mtf } func (this *MtfClient) State() { } func (this *MtfClient) Recv(cb func(msg *Message)) { //log.Println("client is ready", this.GetId()) for { msg, ok := <-this.chrecv if !ok { //chan is closed break } if cb != nil { cb(msg) } } // log.Println("client is stop", this.GetId()) } func (this *MtfClient) RecvMulti(cb func(msg []*Message)) { //log.Println("client is ready, multi", this.GetId()) msgs := make([]*Message, 1024) for { n, err := this.read(msgs) if err != nil { if n > 0 { if cb != nil { cb(msgs[:n]) } } break } if cb != nil { cb(msgs[:n]) } } // log.Println("client is stop, multi", this.GetId()) } func (s *MtfClient) read(buf []*Message) (int, error) { var i = 1 var ok bool buf[0], ok = <-s.chrecv if !ok { return 0, errors.New("chan is closed") } for { if i == len(buf) { return i, nil } select { case data, ok := <-s.chrecv: if !ok { return i, errors.New("chan is closed") } buf[i] = data i++ default: return i, nil } } panic("nerver reach") } func (this *MtfClient) GetId() int64 { return this.clientId } func (this *MtfClient) SetId(id int64) { this.clientId = id } func (this *MtfClient) Send(data interface{}) error { msg := data.(*Message) select { case this.chrecv <- msg: default: // log.Println("send message block") if this.isFullClose { return ErrChanFull } else { this.chrecv <- msg } } return nil } func (this *MtfClient) Close() error { if this.GetRef() != 0 { return errors.New("ref count > 0") } msg := this.SendMessage(MsgClose, nil) if msg.Err != nil { return msg.Err } close(this.chrecv) this.stoptick <- 1 // log.Println("mtf client close success.", this.GetId()) return nil } //连接到mtf func (this *MtfClient) ConnectPrivate() error { _, err := this.sendMessage(MsgMtfBindPrivate, this) return err } func (this *MtfClient) ConnectPublic() error { _, err := this.sendMessage(MsgMtfBindPublic, this) return err } func (this *MtfClient) SetAccount(account *response.AccountStateEvent) error { _, err := this.sendMessage(MsgSetAccount, account) return err } func (this *MtfClient) SetAccountDetails(account *response.AccountDetails) error { _, err := this.sendMessage(MsgSetAccountDetails, account) return err } func (this *MtfClient) CancelOrder(req *request.CancelOrderRequest) error { _, err := this.sendMessage(MsgCancelOrder, req) return err } func (this *MtfClient) CloseOrder(req *request.ClosingOrderRequest) error { _, err := this.sendMessage(MsgCloseOrder, req) return err } func (this *MtfClient) AmendOrder(req *request.AmendStopsOrderRequest) error { _, err := this.sendMessage(MsgAmendOrder, req) return err } func (this *MtfClient) PlaceOrder(req *request.OrderRequest) error { _, err := this.sendMessage(MsgPlaceOrder, req) return err } func (this *MtfClient) SendMessage(msgType int, data interface{}) *Message { //准备 clientid := this.GetId() msg := NewMessage(msgType, clientid, data, 0) if msgType == MsgLog { msg.Flag |= NeedLog } msg.Ch = make(chan *Message, 5) return this.SendMessage2(msg) } func (this *MtfClient) SendMessage2(msg *Message) *Message { this.mtf.Send(msg) //接受 recvmsg := <-msg.Ch if debug { atomic.AddInt64(&this.count, 1) recvmsg.RecvTime = getTime() if recvmsg.RecvTime-recvmsg.SendTime > int64(this.timeout) { log.Println("msg timeout", msg) } } return recvmsg } func (this *MtfClient) SendMessageAsyn2(msg *Message) *Message { msg.Flag |= SendAsyn this.mtf.Send(msg) return msg } func (this *MtfClient) SendMessageAsyn(msgType int, data interface{}) *Message { //准备 clientid := this.GetId() msg := NewMessage(msgType, clientid, data, 0) return this.SendMessageAsyn2(msg) } func (this *MtfClient) sendMessage(msgType int, data interface{}) (interface{}, error) { msg := this.SendMessage(msgType, data) if msg.Err != nil { return msg.Data, msg.Err } return msg.Data, nil } //更新tick, 这个操作是最频繁的 func (this *MtfClient) UpdateTick(event *TickEvent) *AccountUpdated { updated, err := this.sendMessage(MsgSetTick, event) if err != nil { return nil } return updated.(*AccountUpdated) } func (this *MtfClient) CloneAccount(copyInstinfo bool) *AccountInfo { data, err := this.sendMessage(MsgCloneAccount, copyInstinfo) if err != nil { return nil } return data.(*AccountInfo) } func (this *MtfClient) GetOb2(id int64) *response.OrderBookEvent { data, err := this.sendMessage(MsgGetTick, id) if err != nil { return nil } return data.(*response.OrderBookEvent) } func (this *MtfClient) GetAllOb2() []*response.OrderBookEvent { data, err := this.sendMessage(MsgGetAllTick, nil) if err != nil { return nil } return data.([]*response.OrderBookEvent) } func (this *MtfClient) SetPosition(event *response.PositionEvent) error { _, err := this.sendMessage(MsgSetPosition, event) return err } func (this *MtfClient) SetOrder(event *response.OrderEvent) error { _, err := this.sendMessage(MsgSetOrder, event) return err } func (this *MtfClient) SetExecution(event *response.ExecutionEvent) error { _, err := this.sendMessage(MsgSetExecution, event) return err } func (this *MtfClient) Init(event *InitEvent) *AccountUpdated { updated, err := this.sendMessage(MsgInit, event) if err != nil { return nil } return updated.(*AccountUpdated) } func (this *MtfClient) SetOneExecution(event *OneExecutionEvent) *AccountUpdated { updated, err := this.sendMessage(MsgSetOneExecution, event) if err != nil { return nil } return updated.(*AccountUpdated) } func (this *MtfClient) GetPosition(id int64) *response.PositionEvent { data, err := this.sendMessage(MsgGetPosition, id) if err != nil { return nil } return data.(*response.PositionEvent) } func (this *MtfClient) GetPositions(iscopy bool) []*response.PositionEvent { data, err := this.sendMessage(MsgGetPositions, iscopy) if err != nil { return nil } return data.([]*response.PositionEvent) } func (this *MtfClient) GetOrders(iscopy bool) []*response.OrderEvent { data, err := this.sendMessage(MsgGetOrders, iscopy) if err != nil { return nil } return data.([]*response.OrderEvent) } func (this *MtfClient) SetRejected(event *response.InstructionRejectedEvent) error { _, err := this.sendMessage(MsgSetRejected, event) return err } //货币对列表 //暂时不做更新 func (this *MtfClient) SetInstrument(event *response.Instrument) error { _, err := this.sendMessage(MsgSetInstrument, event) return err } func (this *MtfClient) SetObStatus(event *response.OrderBookStatusEvent) error { _, err := this.sendMessage(MsgSetObStatus, event) return err } //暂时不做更新 func (this *MtfClient) SetExchangeRate(event *RateEvent) *AccountUpdated { updated, err := this.sendMessage(MsgSetExchangeRate, event) if err != nil { return nil } return updated.(*AccountUpdated) } func (this *MtfClient) GetInstrument(id int64) *response.Instrument { data, err := this.sendMessage(MsgGetInstrument, id) if err != nil { return nil } return data.(*response.Instrument) } func (this *MtfClient) GetInstruments(iscopy bool) []*response.Instrument { data, err := this.sendMessage(MsgGetInstruments, iscopy) if err != nil { return nil } return data.([]*response.Instrument) } func (this *MtfClient) GetAccount() *response.AccountStateEvent { data, err := this.sendMessage(MsgGetAccount, nil) if err != nil { return nil } return data.(*response.AccountStateEvent) }