|
- package lmaxapi
- import "unsafe"
- import "time"
- import "sync/atomic"
- import "log"
- import "tickserver/api/lmaxapi/request"
- import "tickserver/api/lmaxapi/response"
- import "errors"
- var debug = true
- type MtfClient struct {
- mtf *Mtf
- clientId int64
- chrecv chan *Message
- isFullClose bool
- timeout time.Duration
- refCount int64
- stoptick chan int
- count int64
- }
- var ErrChanFull = errors.New("chan is full")
- //fullClose 这个参数很重要,如果设置为 true,那么send 函数完全异步,否则就是有可能
- //发生等待的情况。
- func NewMtfClient(mtf *Mtf, timeout time.Duration, isFullClose bool, bufferCount int) *MtfClient {
- client := &MtfClient{}
- client.mtf = mtf
- client.chrecv = make(chan *Message, bufferCount)
- client.isFullClose = isFullClose
- client.timeout = timeout
- client.clientId = int64(uintptr(unsafe.Pointer(client)))
- if debug {
- client.stoptick = make(chan int, 1)
- //在调试状态下可以打开,看看是否有timeout
- go client.countStat()
- }
- return client
- }
- func (this *MtfClient) countStat() {
- ticker := time.NewTicker(time.Second)
- lastcount := int64(0)
- for {
- select {
- case <-ticker.C:
- if lastcount > 0 {
- count := atomic.LoadInt64(&this.count) - lastcount
- tracelog.Println("send count:", count)
- }
- lastcount = atomic.LoadInt64(&this.count)
- case <-this.stoptick:
- return
- }
- }
- log.Println("stop count")
- }
- func (this *MtfClient) IncRef() int64 {
- return atomic.AddInt64(&this.refCount, 1)
- }
- func (this *MtfClient) DecRef() int64 {
- return atomic.AddInt64(&this.refCount, -1)
- }
- func (this *MtfClient) GetRef() int64 {
- return atomic.LoadInt64(&this.refCount)
- }
- func (this *MtfClient) GetMtf() *Mtf {
- return this.mtf
- }
- func (this *MtfClient) SetMtf(mtf *Mtf) {
- this.mtf = mtf
- }
- func (this *MtfClient) State() {
- }
- func (this *MtfClient) Recv(cb func(msg *Message)) {
- //log.Println("client is ready", this.GetId())
- for {
- msg, ok := <-this.chrecv
- if !ok { //chan is closed
- break
- }
- if cb != nil {
- cb(msg)
- }
- }
- // log.Println("client is stop", this.GetId())
- }
- func (this *MtfClient) RecvMulti(cb func(msg []*Message)) {
- //log.Println("client is ready, multi", this.GetId())
- msgs := make([]*Message, 1024)
- for {
- n, err := this.read(msgs)
- if err != nil {
- if n > 0 {
- if cb != nil {
- cb(msgs[:n])
- }
- }
- break
- }
- if cb != nil {
- cb(msgs[:n])
- }
- }
- // log.Println("client is stop, multi", this.GetId())
- }
- func (s *MtfClient) read(buf []*Message) (int, error) {
- var i = 1
- var ok bool
- buf[0], ok = <-s.chrecv
- if !ok {
- return 0, errors.New("chan is closed")
- }
- for {
- if i == len(buf) {
- return i, nil
- }
- select {
- case data, ok := <-s.chrecv:
- if !ok {
- return i, errors.New("chan is closed")
- }
- buf[i] = data
- i++
- default:
- return i, nil
- }
- }
- panic("nerver reach")
- }
- func (this *MtfClient) GetId() int64 {
- return this.clientId
- }
- func (this *MtfClient) SetId(id int64) {
- this.clientId = id
- }
- func (this *MtfClient) Send(data interface{}) error {
- msg := data.(*Message)
- select {
- case this.chrecv <- msg:
- default:
- // log.Println("send message block")
- if this.isFullClose {
- return ErrChanFull
- } else {
- this.chrecv <- msg
- }
- }
- return nil
- }
- func (this *MtfClient) Close() error {
- if this.GetRef() != 0 {
- return errors.New("ref count > 0")
- }
- msg := this.SendMessage(MsgClose, nil)
- if msg.Err != nil {
- return msg.Err
- }
- close(this.chrecv)
- this.stoptick <- 1
- // log.Println("mtf client close success.", this.GetId())
- return nil
- }
- //连接到mtf
- func (this *MtfClient) ConnectPrivate() error {
- _, err := this.sendMessage(MsgMtfBindPrivate, this)
- return err
- }
- func (this *MtfClient) ConnectPublic() error {
- _, err := this.sendMessage(MsgMtfBindPublic, this)
- return err
- }
- func (this *MtfClient) SetAccount(account *response.AccountStateEvent) error {
- _, err := this.sendMessage(MsgSetAccount, account)
- return err
- }
- func (this *MtfClient) SetAccountDetails(account *response.AccountDetails) error {
- _, err := this.sendMessage(MsgSetAccountDetails, account)
- return err
- }
- func (this *MtfClient) CancelOrder(req *request.CancelOrderRequest) error {
- _, err := this.sendMessage(MsgCancelOrder, req)
- return err
- }
- func (this *MtfClient) CloseOrder(req *request.ClosingOrderRequest) error {
- _, err := this.sendMessage(MsgCloseOrder, req)
- return err
- }
- func (this *MtfClient) AmendOrder(req *request.AmendStopsOrderRequest) error {
- _, err := this.sendMessage(MsgAmendOrder, req)
- return err
- }
- func (this *MtfClient) PlaceOrder(req *request.OrderRequest) error {
- _, err := this.sendMessage(MsgPlaceOrder, req)
- return err
- }
- func (this *MtfClient) SendMessage(msgType int, data interface{}) *Message {
- //准备
- clientid := this.GetId()
- msg := NewMessage(msgType, clientid, data, 0)
- if msgType == MsgLog {
- msg.Flag |= NeedLog
- }
- msg.Ch = make(chan *Message, 5)
- return this.SendMessage2(msg)
- }
- func (this *MtfClient) SendMessage2(msg *Message) *Message {
- this.mtf.Send(msg)
- //接受
- recvmsg := <-msg.Ch
- if debug {
- atomic.AddInt64(&this.count, 1)
- recvmsg.RecvTime = getTime()
- if recvmsg.RecvTime-recvmsg.SendTime > int64(this.timeout) {
- log.Println("msg timeout", msg)
- }
- }
- return recvmsg
- }
- func (this *MtfClient) SendMessageAsyn2(msg *Message) *Message {
- msg.Flag |= SendAsyn
- this.mtf.Send(msg)
- return msg
- }
- func (this *MtfClient) SendMessageAsyn(msgType int, data interface{}) *Message {
- //准备
- clientid := this.GetId()
- msg := NewMessage(msgType, clientid, data, 0)
- return this.SendMessageAsyn2(msg)
- }
- func (this *MtfClient) sendMessage(msgType int, data interface{}) (interface{}, error) {
- msg := this.SendMessage(msgType, data)
- if msg.Err != nil {
- return msg.Data, msg.Err
- }
- return msg.Data, nil
- }
- //更新tick, 这个操作是最频繁的
- func (this *MtfClient) UpdateTick(event *TickEvent) *AccountUpdated {
- updated, err := this.sendMessage(MsgSetTick, event)
- if err != nil {
- return nil
- }
- return updated.(*AccountUpdated)
- }
- func (this *MtfClient) CloneAccount(copyInstinfo bool) *AccountInfo {
- data, err := this.sendMessage(MsgCloneAccount, copyInstinfo)
- if err != nil {
- return nil
- }
- return data.(*AccountInfo)
- }
- func (this *MtfClient) GetOb2(id int64) *response.OrderBookEvent {
- data, err := this.sendMessage(MsgGetTick, id)
- if err != nil {
- return nil
- }
- return data.(*response.OrderBookEvent)
- }
- func (this *MtfClient) GetAllOb2() []*response.OrderBookEvent {
- data, err := this.sendMessage(MsgGetAllTick, nil)
- if err != nil {
- return nil
- }
- return data.([]*response.OrderBookEvent)
- }
- func (this *MtfClient) SetPosition(event *response.PositionEvent) error {
- _, err := this.sendMessage(MsgSetPosition, event)
- return err
- }
- func (this *MtfClient) SetOrder(event *response.OrderEvent) error {
- _, err := this.sendMessage(MsgSetOrder, event)
- return err
- }
- func (this *MtfClient) SetExecution(event *response.ExecutionEvent) error {
- _, err := this.sendMessage(MsgSetExecution, event)
- return err
- }
- func (this *MtfClient) Init(event *InitEvent) *AccountUpdated {
- updated, err := this.sendMessage(MsgInit, event)
- if err != nil {
- return nil
- }
- return updated.(*AccountUpdated)
- }
- func (this *MtfClient) SetOneExecution(event *OneExecutionEvent) *AccountUpdated {
- updated, err := this.sendMessage(MsgSetOneExecution, event)
- if err != nil {
- return nil
- }
- return updated.(*AccountUpdated)
- }
- func (this *MtfClient) GetPosition(id int64) *response.PositionEvent {
- data, err := this.sendMessage(MsgGetPosition, id)
- if err != nil {
- return nil
- }
- return data.(*response.PositionEvent)
- }
- func (this *MtfClient) GetPositions(iscopy bool) []*response.PositionEvent {
- data, err := this.sendMessage(MsgGetPositions, iscopy)
- if err != nil {
- return nil
- }
- return data.([]*response.PositionEvent)
- }
- func (this *MtfClient) GetOrders(iscopy bool) []*response.OrderEvent {
- data, err := this.sendMessage(MsgGetOrders, iscopy)
- if err != nil {
- return nil
- }
- return data.([]*response.OrderEvent)
- }
- func (this *MtfClient) SetRejected(event *response.InstructionRejectedEvent) error {
- _, err := this.sendMessage(MsgSetRejected, event)
- return err
- }
- //货币对列表
- //暂时不做更新
- func (this *MtfClient) SetInstrument(event *response.Instrument) error {
- _, err := this.sendMessage(MsgSetInstrument, event)
- return err
- }
- func (this *MtfClient) SetObStatus(event *response.OrderBookStatusEvent) error {
- _, err := this.sendMessage(MsgSetObStatus, event)
- return err
- }
- //暂时不做更新
- func (this *MtfClient) SetExchangeRate(event *RateEvent) *AccountUpdated {
- updated, err := this.sendMessage(MsgSetExchangeRate, event)
- if err != nil {
- return nil
- }
- return updated.(*AccountUpdated)
- }
- func (this *MtfClient) GetInstrument(id int64) *response.Instrument {
- data, err := this.sendMessage(MsgGetInstrument, id)
- if err != nil {
- return nil
- }
- return data.(*response.Instrument)
- }
- func (this *MtfClient) GetInstruments(iscopy bool) []*response.Instrument {
- data, err := this.sendMessage(MsgGetInstruments, iscopy)
- if err != nil {
- return nil
- }
- return data.([]*response.Instrument)
- }
- func (this *MtfClient) GetAccount() *response.AccountStateEvent {
- data, err := this.sendMessage(MsgGetAccount, nil)
- if err != nil {
- return nil
- }
- return data.(*response.AccountStateEvent)
- }
|