|
- package lmaxapi
- import (
- "errors"
- "fmt"
- "io"
- "io/ioutil"
- "tickserver/api/lmaxapi/request"
- "tickserver/api/lmaxapi/response"
- "tickserver/api/lmaxapi/util"
- "log"
- "net/http"
- "strings"
- "sync"
- "time"
- )
- var tracelog = log.New(ioutil.Discard, "[trace]", log.Ltime)
- type Session struct {
- lmax_url, sessionId string
- running bool
- runLock sync.RWMutex
- mu sync.RWMutex
- hearttick *time.Ticker
- eventHandler *EventHandler
- asynCallWait sync.WaitGroup
- accountDetails *response.AccountDetails
- stream *http.Response
- lastRecvTime time.Time
- orderBook *OrderBook
- heartstop chan int
- }
- func EnableLog(w io.Writer) {
- tracelog = log.New(w, "[trace]", log.Ldate|log.Lmicroseconds)
- }
- func DisableLog() {
- tracelog = log.New(ioutil.Discard, "[trace]", log.Ldate|log.Lmicroseconds)
- }
- func NewSession(url string, sid string, account *response.AccountDetails) *Session {
- //log.Println("NewSession")
- s := Session{lmax_url: url, sessionId: sid, running: false, accountDetails: account}
- s.lastRecvTime = time.Now()
- s.eventHandler = NewEventHandler(&s)
- mtf := NewMtf()
- go mtf.Start()
- s.orderBook = NewOrderBook(mtf)
- s.heartstop = make(chan int)
- return &s
- }
- func (this *Session) Login(username string, password string, productType string) error {
- req := request.NewLoginRequest(username, password, productType)
- session, err := Login(req)
- if err != nil {
- return err
- }
- session.orderBook.Stop()
- this.mu.Lock()
- this.lmax_url = session.lmax_url
- this.sessionId = session.sessionId
- this.accountDetails = session.accountDetails
- this.mu.Unlock()
- this.UpdateRecvTime()
- return nil
- }
- func (this *Session) IsLogin() bool {
- this.mu.Lock()
- defer this.mu.Unlock()
- return this.sessionId != ""
- }
- func (this *Session) GetOrderBook() *OrderBook {
- return this.orderBook
- }
- func (this *Session) LoadAllInstruments(cb func(inst *response.Instrument)) ([]*response.Instrument, error) {
- offset := int64(0)
- var ret []*response.Instrument
- for {
- searchReq := request.NewSearchInstrumentRequest("", offset)
- insList, hasMore, err := this.SearchInstruments(searchReq, nil)
- if err != nil {
- return nil, err
- }
- if cb != nil {
- for _, value := range insList {
- cb(value)
- }
- }
- if !hasMore {
- ret = append(ret, insList...)
- return ret, nil
- }
- ret = append(ret, insList...)
- offset = insList[len(insList)-1].Id
- }
- panic("never reached.")
- }
- func (this *Session) UpdateRecvTime() {
- this.mu.Lock()
- defer this.mu.Unlock()
- this.lastRecvTime = time.Now()
- }
- func (this *Session) LastRecvTime() time.Time {
- this.mu.Lock()
- defer this.mu.Unlock()
- return this.lastRecvTime
- }
- func (this *Session) Logout(callback Callback) error {
- return this.asynCallback(callback, func() error {
- req := request.NewLogoutRequest()
- res, reqerr := this.sessionAction(req, "POST", "Logout")
- if reqerr != nil {
- return reqerr
- }
- defer res.Body.Close()
- buf, _ := ioutil.ReadAll(res.Body)
- bufStr := string(buf)
- err := parseOKStatusString(bufStr, "Logout", res.StatusCode)
- return err
- })
- }
- func (this *Session) Subscribe(req IRequest, callback Callback) error {
- return this.asynCallback(callback, func() error {
- res, reqerr := this.sessionAction(req, "POST", "Subscribe")
- if reqerr != nil {
- return reqerr
- }
- defer res.Body.Close()
- buf, _ := ioutil.ReadAll(res.Body)
- bufStr := string(buf)
- err := parseOKStatusString(bufStr, "Subscribe", res.StatusCode)
- return err
- })
- }
- func (this *Session) Unsubscribe(req IRequest, callback Callback) error {
- return this.asynCallback(callback, func() error {
- req.(UrlSetter).SetUrl("/secure/unsubscribe")
- res, reqerr := this.sessionAction(req, "POST", "Unsubscribe")
- if reqerr != nil {
- return reqerr
- }
- defer res.Body.Close()
- buf, _ := ioutil.ReadAll(res.Body)
- bufStr := string(buf)
- err := parseOKStatusString(bufStr, "Subscribe", res.StatusCode)
- return err
- })
- }
- func (this *Session) SearchInstruments(req IRequest, searchCallback SearchInstrumentCallback) ([]*response.Instrument, bool, error) {
- return this.asynSearchInstrumentCallback(searchCallback, func() ([]*response.Instrument, bool, error) {
- res, reqErr := this.sessionAction(req, "GET", "SearchInstrument")
- if reqErr != nil {
- return nil, false, reqErr
- }
- defer res.Body.Close()
- buf, _ := ioutil.ReadAll(res.Body)
- bodyData := string(buf)
- statusErr := parseOKStatusString(bodyData, "SearchInstruments", res.StatusCode)
- if statusErr != nil {
- return nil, false, statusErr
- }
- insts := response.NewInstruments(bodyData)
- return insts.Data, insts.HasMoreResults, nil
- })
- }
- func (this *Session) GetCompletedOrder(req IRequest, callback CompletedOrderCallback) ([]*response.CompletedOrder, string, error) {
- return this.asynGetCompletedOrder(callback, func() ([]*response.CompletedOrder, string, error) {
- res, reqErr := this.sessionAction(req, "GET", "CompletedOrder")
- if reqErr != nil {
- return nil, "", reqErr
- }
- defer res.Body.Close()
- buf, _ := ioutil.ReadAll(res.Body)
- bodyData := string(buf)
- statusErr := parseOKStatusString(bodyData, "CompletedOrder", res.StatusCode)
- if statusErr != nil {
- return nil, "", statusErr
- }
- insts, err := response.NewCompletedOrders(bodyData)
- if err != nil {
- return nil, "", err
- }
- if insts.HasMoreResults == false {
- return insts.Data, "", nil
- }
- offset := insts.Data[len(insts.Data)-1].OrderId
- return insts.Data, offset, nil
- })
- }
- func (this *Session) GetAccountStatement(req IRequest, callback AccountStatementCallback) ([]*response.AccountStatement, bool, error) {
- return this.asynGetAccountStatement(callback, func() ([]*response.AccountStatement, bool, error) {
- res, reqErr := this.sessionAction(req, "GET", "AccountStatement")
- if reqErr != nil {
- return nil, false, reqErr
- }
- defer res.Body.Close()
- buf, _ := ioutil.ReadAll(res.Body)
- bodyData := string(buf)
- statusErr := parseOKStatusString(bodyData, "AccountStatement", res.StatusCode)
- if statusErr != nil {
- return nil, false, statusErr
- }
- insts, err := response.NewAccountStatements(bodyData)
- if err != nil {
- return nil, false, err
- }
- return insts.Data, insts.HasMoreResults, nil
- })
- }
- func (this *Session) GetOrderTransaction(req IRequest, callback OrderTransactionCallback) ([]*response.OrderTransaction, bool, error) {
- return this.asynGetOrderTransaction(callback, func() ([]*response.OrderTransaction, bool, error) {
- res, reqErr := this.sessionAction(req, "GET", "OrderTransaction")
- if reqErr != nil {
- return nil, false, reqErr
- }
- defer res.Body.Close()
- buf, _ := ioutil.ReadAll(res.Body)
- bodyData := string(buf)
- statusErr := parseOKStatusString(bodyData, "OrderTransaction", res.StatusCode)
- if statusErr != nil {
- return nil, false, statusErr
- }
- insts, err := response.NewOrderTransactions(bodyData)
- if err != nil {
- return nil, false, err
- }
- return insts.Data, insts.HasMoreResults, nil
- })
- }
- func (this *Session) GetActivity(req IRequest, callback ActivityCallback) ([]*response.Activity, bool, error) {
- return this.asynGetActivity(callback, func() ([]*response.Activity, bool, error) {
- _, ok := req.(*request.OrderActivityAuditTrailRequest)
- method := "GET"
- action := "Activity"
- if ok {
- method = "POST"
- action = "OrderActivityAuditTrailRequest"
- }
- res, reqErr := this.sessionAction(req, method, action)
- if reqErr != nil {
- return nil, false, reqErr
- }
- defer res.Body.Close()
- buf, _ := ioutil.ReadAll(res.Body)
- bodyData := string(buf)
- statusErr := parseOKStatusString(bodyData, action, res.StatusCode)
- if statusErr != nil {
- return nil, false, statusErr
- }
- insts, err := response.NewActivitys(bodyData)
- if err != nil {
- return nil, false, err
- }
- return insts.Data, insts.HasMoreResults, nil
- })
- }
- //compare and set 整个操作要原子
- func (this *Session) Start() error {
- tracelog.Println("begin start")
- this.runLock.Lock()
- if this.running { //如果已经running了
- this.runLock.Unlock()
- return NewOpError("Stream", errors.New("Error stream status,stream is running"), 0, false)
- }
- this.running = true //running 设置成true
- this.runLock.Unlock()
- streamReq := request.NewStreamRequest()
- //start 停止的条件:1. 发生403错误 2. 被外界stop
- for this.Isrunning() {
- res, err := this.sessionAction(streamReq, "POST", "Stream")
- this.runLock.Lock()
- this.stream = res //这里要加锁,stream 会被stop 函数在多线程环境下使用
- this.runLock.Unlock()
- if err != nil {
- this.eventHandler.HandleEventError(err)
- continue
- }
- //检查状态码,这个时候是session 过期了,要重新登录
- if this.stream.StatusCode == 403 {
- this.eventHandler.HandleEventSessionDisconnected()
- // this.Stop()
- return NewOpError("Stream", errors.New("Error ssesion is out of date"), 403, true)
- }
- //处理事件
- tracelog.Println("begin processStreamResponse")
- err = this.processStreamResponse()
- tracelog.Println("end processStreamResponse")
- if err != nil {
- this.eventHandler.HandleEventError(NewOpError("Stream", err, res.StatusCode, true))
- }
- }
- tracelog.Println("end start")
- time.Sleep(time.Second)
- return nil
- }
- func (this *Session) processStreamResponse() error {
- //保证:连接会执行close
- res := this.stream
- defer res.Body.Close()
- buf := make([]byte, 256) //这块内存应该能够应付最大的<events></events>
- var bodyData, nodeName, nodeData string
- var n int
- var err error
- for this.Isrunning() {
- n, err = res.Body.Read(buf)
- if err != nil {
- break
- }
- this.UpdateRecvTime()
- str := string(buf[0:n])
- bodyData = bodyData + str
- for {
- nodeName, nodeData, bodyData = util.ParseXmlNode(bodyData)
- if nodeName == "" {
- break
- }
- switch nodeName {
- case "events":
- index := strings.Index(nodeData, "<header><seq>0</seq></header>")
- if index != -1 {
- this.eventHandler.HandleEventData(nodeData, true)
- } else {
- this.eventHandler.HandleEventData(nodeData, false)
- }
- default:
- // fmt.Println("eventData:", nodeData)
- }
- }
- }
- return err
- }
- func (this *Session) PlaceMarketOrder(marketOrder IRequest, callback OrderCallback) (string, error) {
- return this.order(marketOrder, callback, "PlaceMarketOrder")
- }
- func (this *Session) PlaceLimitOrder(marketOrder IRequest, callback OrderCallback) (string, error) {
- return this.order(marketOrder, callback, "LimitOrder")
- }
- func (this *Session) PlaceStopOrder(marketOrder IRequest, callback OrderCallback) (string, error) {
- return this.order(marketOrder, callback, "StopOrder")
- }
- func (this *Session) CancelOrder(cancelOrder IRequest, callback OrderCallback) (string, error) {
- return this.order(cancelOrder, callback, "CancelOrder")
- }
- func (this *Session) PlaceClosingOrder(order *request.ClosingOrderRequest, callback OrderCallback) (string, error) {
- return this.order(order, callback, "ClosingOrder")
- }
- func (this *Session) PlaceAmendStopsOrder(order *request.AmendStopsOrderRequest, callback OrderCallback) (string, error) {
- return this.order(order, callback, "AmendStopsOrder")
- }
- func (this *Session) order(marketOrder IRequest, callback OrderCallback, op string) (string, error) {
- return this.asynOrderCallback(callback, func() (string, error) {
- bodyData, reqErr := this.sessionActionOkStatus(marketOrder, "POST", op)
- if reqErr != nil {
- return "", reqErr
- }
- return util.ParseXmlNameData(bodyData, []string{"res", "body", "instructionId"}), nil
- })
- }
- func (this *Session) OrderActions(order *OrderActionsRequest, callback OrderCallback) (string, error) {
- return this.order(order, callback, "OrderActions")
- }
- //它应该和Login 一起启动
- //并且随着session的stop而停止
- func (this *Session) HeartbeatTimeout(t time.Duration) {
- req2 := request.NewHeartbeatSubscriptionRequest()
- this.Subscribe(req2, nil)
- first := true
- c := time.NewTicker(t)
- this.runLock.Lock()
- if this.hearttick != nil {
- this.hearttick.Stop()
- //停止原来的heartbeat
- this.heartstop <- 1
- }
- this.hearttick = c
- this.runLock.Unlock()
- go func() {
- tracelog.Println("start heart beat.")
- for {
- select {
- case <-c.C:
- tracelog.Println("heart beat check begin.")
- reciveTime := this.LastRecvTime()
- //第一次不检查,要先让心跳发出去
- tracelog.Println("heart beat offset", time.Now().Sub(reciveTime), reciveTime)
- if !first && time.Now().Sub(reciveTime) > t+1*time.Second {
- err := NewOpError("Stream", errors.New("Heartbeat timeout"), -1, true)
- this.eventHandler.HandleEventError(err)
- }
- first = false
- req3 := request.NewHeartbeatRequest("hello")
- this.RequestHeartbeat(req3, func(str string, err error) {
- tracelog.Println("RequestHeartbeat", str, err)
- })
- tracelog.Println("heart beat check end.")
- case <-this.heartstop:
- tracelog.Println("end heart beat.")
- return
- }
- }
- }()
- }
- func (this *Session) RequestHeartbeat(heartbeatRequest *request.HeartbeatRequest, callback HeartbeatCallback) (string, error) {
- return this.asynHeartbeatCallback(callback, func() (string, error) {
- bodyData, reqErr := this.sessionActionOkStatus(heartbeatRequest, "POST", "Heartbeat")
- if reqErr != nil {
- return "", reqErr
- }
- return util.ParseXmlNameData(bodyData, []string{"res", "body", "token"}), nil
- })
- }
- func (this *Session) RequestHistoricMarketData(request IRequest, callback Callback) error {
- return this.asynCallback(callback, func() error {
- _, err := this.sessionActionOkStatus(request, "POST", "RequestHistorMarketData")
- return err
- })
- }
- func (this *Session) Isrunning() bool {
- this.runLock.RLock()
- defer this.runLock.RUnlock()
- return this.running
- }
- func (this *Session) Stop() {
- this.runLock.Lock()
- defer this.runLock.Unlock()
- if !this.running {
- return
- }
- if this.hearttick != nil {
- this.hearttick.Stop()
- }
- this.running = false
- this.stopStream()
- }
- func (this *Session) stopStream() {
- if this.stream != nil && this.stream.Body != nil {
- //关闭stream 连接
- this.stream.Body.Close()
- }
- }
- //仅仅重启一下stream,不重新登录
- func (this *Session) StopStream() {
- this.runLock.Lock()
- defer this.runLock.Unlock()
- this.stopStream()
- }
- func (this *Session) RegisterAccountStateEvent(cb AccountStateListener) {
- this.eventHandler.OnAccountStateEvent = cb
- }
- func (this *Session) RegisterInitEvent(cb InitListener) {
- this.eventHandler.OnInit = cb
- }
- func (this *Session) RegisterOneExecutionEvent(cb OneExecutionListener) {
- this.eventHandler.OnOneExecution = cb
- }
- func (this *Session) RegisterExecutionEvent(cb ExecutionListener) {
- this.eventHandler.OnExecutionEvent = cb
- }
- func (this *Session) RegisterHeartbeatEvent(cb HeartbeatListener) {
- this.eventHandler.OnHeartbeatEvent = cb
- }
- func (this *Session) RegisterHistoricMarketDataEvent(cb HistoricMarketDataListener) {
- this.eventHandler.OnHistoricEvent = cb
- }
- func (this *Session) RegisterInstructionRejectedEvent(cb InstructionRejectedListener) {
- this.eventHandler.OnInstructionRejected = cb
- }
- func (this *Session) RegisterOrderBookEvent(cb OrderBookListener) {
- this.eventHandler.OnOrderBookEvent = cb
- }
- func (this *Session) RegisterOrderBookStatusEvent(cb OrderBookStatusListener) {
- this.eventHandler.OnOrderBookStatusEvent = cb
- }
- func (this *Session) RegisterOrderEvent(cb OrderListener) {
- this.eventHandler.OnOrderEvent = cb
- }
- func (this *Session) RegisterPositionEvent(cb PositionListener) {
- this.eventHandler.OnPositionEvent = cb
- }
- func (this *Session) RegisterExchangeRateEvent(cb ExchangeRateListener) {
- this.eventHandler.OnExchangeRateEvent = cb
- }
- func (this *Session) RegisterStreamFailureEvent(cb StreamFailureListener) {
- this.eventHandler.OnStreamFailure = cb
- }
- func (this *Session) RegisterSessionDisconnected(cb SessionDisconnectedListener) {
- this.eventHandler.OnSessionDisconnected = cb
- }
- func (this *Session) GetAccountDetails() *response.AccountDetails {
- this.mu.Lock()
- defer this.mu.Unlock()
- return this.accountDetails
- }
- func (this *Session) KeepAlive(d time.Duration) {
- req := &request.KeepAliveRequest{}
- for this.Isrunning() {
- _, err := this.sessionAction(req, "GET", "KeepAlive")
- if err != nil {
- // fmt.Println(err)
- }
- fmt.Println("KeepAlive sleep")
- time.Sleep(d)
- }
- }
- func (this *Session) OpenUrl(url string, callback UrlCallback) (*http.Response, error) {
- return this.asynUrlCallback(callback, func() (*http.Response, error) {
- this.mu.Lock()
- sid := this.sessionId
- this.mu.Unlock()
- res, err := httpOpen(url, sid)
- if err != nil {
- code := 0
- if res != nil {
- code = res.StatusCode
- }
- return res, NewOpError("OpenUrl", err, code, true)
- }
- return res, nil
- })
- }
- func (this *Session) sessionActionOkStatus(req IRequest, method string, op string) (string, error) {
- res, reqErr := this.sessionAction(req, method, op)
- if reqErr != nil {
- return "", reqErr
- }
- defer res.Body.Close()
- buf, _ := ioutil.ReadAll(res.Body)
- bodyData := string(buf)
- statusErr := parseOKStatusString(bodyData, op, res.StatusCode)
- if statusErr != nil {
- return "", statusErr
- }
- return bodyData, nil
- }
- func parseOKStatusString(data string, op string, code int) error {
- // tracelog.Println("[", op, "]", data)
- status := util.ParseXmlNameData(data, []string{"res", "header", "status"})
- if status != "OK" {
- opErr := NewOpError(op, errors.New("Error status:"+status+data), code, false)
- return opErr
- }
- return nil
- }
- func (this *Session) sessionAction(req IRequest, method string, op string) (*http.Response, error) {
- var res *http.Response
- var err error
- this.mu.Lock()
- sid := this.sessionId
- this.mu.Unlock()
- if method == "POST" {
- res, err = httpPost(this.lmax_url, sid, req)
- } else {
- res, err = httpGet(this.lmax_url, sid, req)
- }
- if err != nil {
- code := 0
- if res != nil {
- code = res.StatusCode
- }
- return nil, NewOpError(op, err, code, true)
- }
- //这两个操作一般在初始化的时候完成,不会频繁操作,可以防止初始化的时候,因为这两个操作太慢而超时
- if op == "SearchInstrument" || op == "Subscribe" {
- this.UpdateRecvTime()
- }
- return res, nil
- }
|