package lmaxapi
import (
"errors"
"fmt"
"io"
"io/ioutil"
"tickserver/api/lmaxapi/request"
"tickserver/api/lmaxapi/response"
"tickserver/api/lmaxapi/util"
"log"
"net/http"
"strings"
"sync"
"time"
)
var tracelog = log.New(ioutil.Discard, "[trace]", log.Ltime)
type Session struct {
lmax_url, sessionId string
running bool
runLock sync.RWMutex
mu sync.RWMutex
hearttick *time.Ticker
eventHandler *EventHandler
asynCallWait sync.WaitGroup
accountDetails *response.AccountDetails
stream *http.Response
lastRecvTime time.Time
orderBook *OrderBook
heartstop chan int
}
func EnableLog(w io.Writer) {
tracelog = log.New(w, "[trace]", log.Ldate|log.Lmicroseconds)
}
func DisableLog() {
tracelog = log.New(ioutil.Discard, "[trace]", log.Ldate|log.Lmicroseconds)
}
func NewSession(url string, sid string, account *response.AccountDetails) *Session {
//log.Println("NewSession")
s := Session{lmax_url: url, sessionId: sid, running: false, accountDetails: account}
s.lastRecvTime = time.Now()
s.eventHandler = NewEventHandler(&s)
mtf := NewMtf()
go mtf.Start()
s.orderBook = NewOrderBook(mtf)
s.heartstop = make(chan int)
return &s
}
func (this *Session) Login(username string, password string, productType string) error {
req := request.NewLoginRequest(username, password, productType)
session, err := Login(req)
if err != nil {
return err
}
session.orderBook.Stop()
this.mu.Lock()
this.lmax_url = session.lmax_url
this.sessionId = session.sessionId
this.accountDetails = session.accountDetails
this.mu.Unlock()
this.UpdateRecvTime()
return nil
}
func (this *Session) IsLogin() bool {
this.mu.Lock()
defer this.mu.Unlock()
return this.sessionId != ""
}
func (this *Session) GetOrderBook() *OrderBook {
return this.orderBook
}
func (this *Session) LoadAllInstruments(cb func(inst *response.Instrument)) ([]*response.Instrument, error) {
offset := int64(0)
var ret []*response.Instrument
for {
searchReq := request.NewSearchInstrumentRequest("", offset)
insList, hasMore, err := this.SearchInstruments(searchReq, nil)
if err != nil {
return nil, err
}
if cb != nil {
for _, value := range insList {
cb(value)
}
}
if !hasMore {
ret = append(ret, insList...)
return ret, nil
}
ret = append(ret, insList...)
offset = insList[len(insList)-1].Id
}
panic("never reached.")
}
func (this *Session) UpdateRecvTime() {
this.mu.Lock()
defer this.mu.Unlock()
this.lastRecvTime = time.Now()
}
func (this *Session) LastRecvTime() time.Time {
this.mu.Lock()
defer this.mu.Unlock()
return this.lastRecvTime
}
func (this *Session) Logout(callback Callback) error {
return this.asynCallback(callback, func() error {
req := request.NewLogoutRequest()
res, reqerr := this.sessionAction(req, "POST", "Logout")
if reqerr != nil {
return reqerr
}
defer res.Body.Close()
buf, _ := ioutil.ReadAll(res.Body)
bufStr := string(buf)
err := parseOKStatusString(bufStr, "Logout", res.StatusCode)
return err
})
}
func (this *Session) Subscribe(req IRequest, callback Callback) error {
return this.asynCallback(callback, func() error {
res, reqerr := this.sessionAction(req, "POST", "Subscribe")
if reqerr != nil {
return reqerr
}
defer res.Body.Close()
buf, _ := ioutil.ReadAll(res.Body)
bufStr := string(buf)
err := parseOKStatusString(bufStr, "Subscribe", res.StatusCode)
return err
})
}
func (this *Session) Unsubscribe(req IRequest, callback Callback) error {
return this.asynCallback(callback, func() error {
req.(UrlSetter).SetUrl("/secure/unsubscribe")
res, reqerr := this.sessionAction(req, "POST", "Unsubscribe")
if reqerr != nil {
return reqerr
}
defer res.Body.Close()
buf, _ := ioutil.ReadAll(res.Body)
bufStr := string(buf)
err := parseOKStatusString(bufStr, "Subscribe", res.StatusCode)
return err
})
}
func (this *Session) SearchInstruments(req IRequest, searchCallback SearchInstrumentCallback) ([]*response.Instrument, bool, error) {
return this.asynSearchInstrumentCallback(searchCallback, func() ([]*response.Instrument, bool, error) {
res, reqErr := this.sessionAction(req, "GET", "SearchInstrument")
if reqErr != nil {
return nil, false, reqErr
}
defer res.Body.Close()
buf, _ := ioutil.ReadAll(res.Body)
bodyData := string(buf)
statusErr := parseOKStatusString(bodyData, "SearchInstruments", res.StatusCode)
if statusErr != nil {
return nil, false, statusErr
}
insts := response.NewInstruments(bodyData)
return insts.Data, insts.HasMoreResults, nil
})
}
func (this *Session) GetCompletedOrder(req IRequest, callback CompletedOrderCallback) ([]*response.CompletedOrder, string, error) {
return this.asynGetCompletedOrder(callback, func() ([]*response.CompletedOrder, string, error) {
res, reqErr := this.sessionAction(req, "GET", "CompletedOrder")
if reqErr != nil {
return nil, "", reqErr
}
defer res.Body.Close()
buf, _ := ioutil.ReadAll(res.Body)
bodyData := string(buf)
statusErr := parseOKStatusString(bodyData, "CompletedOrder", res.StatusCode)
if statusErr != nil {
return nil, "", statusErr
}
insts, err := response.NewCompletedOrders(bodyData)
if err != nil {
return nil, "", err
}
if insts.HasMoreResults == false {
return insts.Data, "", nil
}
offset := insts.Data[len(insts.Data)-1].OrderId
return insts.Data, offset, nil
})
}
func (this *Session) GetAccountStatement(req IRequest, callback AccountStatementCallback) ([]*response.AccountStatement, bool, error) {
return this.asynGetAccountStatement(callback, func() ([]*response.AccountStatement, bool, error) {
res, reqErr := this.sessionAction(req, "GET", "AccountStatement")
if reqErr != nil {
return nil, false, reqErr
}
defer res.Body.Close()
buf, _ := ioutil.ReadAll(res.Body)
bodyData := string(buf)
statusErr := parseOKStatusString(bodyData, "AccountStatement", res.StatusCode)
if statusErr != nil {
return nil, false, statusErr
}
insts, err := response.NewAccountStatements(bodyData)
if err != nil {
return nil, false, err
}
return insts.Data, insts.HasMoreResults, nil
})
}
func (this *Session) GetOrderTransaction(req IRequest, callback OrderTransactionCallback) ([]*response.OrderTransaction, bool, error) {
return this.asynGetOrderTransaction(callback, func() ([]*response.OrderTransaction, bool, error) {
res, reqErr := this.sessionAction(req, "GET", "OrderTransaction")
if reqErr != nil {
return nil, false, reqErr
}
defer res.Body.Close()
buf, _ := ioutil.ReadAll(res.Body)
bodyData := string(buf)
statusErr := parseOKStatusString(bodyData, "OrderTransaction", res.StatusCode)
if statusErr != nil {
return nil, false, statusErr
}
insts, err := response.NewOrderTransactions(bodyData)
if err != nil {
return nil, false, err
}
return insts.Data, insts.HasMoreResults, nil
})
}
func (this *Session) GetActivity(req IRequest, callback ActivityCallback) ([]*response.Activity, bool, error) {
return this.asynGetActivity(callback, func() ([]*response.Activity, bool, error) {
_, ok := req.(*request.OrderActivityAuditTrailRequest)
method := "GET"
action := "Activity"
if ok {
method = "POST"
action = "OrderActivityAuditTrailRequest"
}
res, reqErr := this.sessionAction(req, method, action)
if reqErr != nil {
return nil, false, reqErr
}
defer res.Body.Close()
buf, _ := ioutil.ReadAll(res.Body)
bodyData := string(buf)
statusErr := parseOKStatusString(bodyData, action, res.StatusCode)
if statusErr != nil {
return nil, false, statusErr
}
insts, err := response.NewActivitys(bodyData)
if err != nil {
return nil, false, err
}
return insts.Data, insts.HasMoreResults, nil
})
}
//compare and set 整个操作要原子
func (this *Session) Start() error {
tracelog.Println("begin start")
this.runLock.Lock()
if this.running { //如果已经running了
this.runLock.Unlock()
return NewOpError("Stream", errors.New("Error stream status,stream is running"), 0, false)
}
this.running = true //running 设置成true
this.runLock.Unlock()
streamReq := request.NewStreamRequest()
//start 停止的条件:1. 发生403错误 2. 被外界stop
for this.Isrunning() {
res, err := this.sessionAction(streamReq, "POST", "Stream")
this.runLock.Lock()
this.stream = res //这里要加锁,stream 会被stop 函数在多线程环境下使用
this.runLock.Unlock()
if err != nil {
this.eventHandler.HandleEventError(err)
continue
}
//检查状态码,这个时候是session 过期了,要重新登录
if this.stream.StatusCode == 403 {
this.eventHandler.HandleEventSessionDisconnected()
// this.Stop()
return NewOpError("Stream", errors.New("Error ssesion is out of date"), 403, true)
}
//处理事件
tracelog.Println("begin processStreamResponse")
err = this.processStreamResponse()
tracelog.Println("end processStreamResponse")
if err != nil {
this.eventHandler.HandleEventError(NewOpError("Stream", err, res.StatusCode, true))
}
}
tracelog.Println("end start")
time.Sleep(time.Second)
return nil
}
func (this *Session) processStreamResponse() error {
//保证:连接会执行close
res := this.stream
defer res.Body.Close()
buf := make([]byte, 256) //这块内存应该能够应付最大的
var bodyData, nodeName, nodeData string
var n int
var err error
for this.Isrunning() {
n, err = res.Body.Read(buf)
if err != nil {
break
}
this.UpdateRecvTime()
str := string(buf[0:n])
bodyData = bodyData + str
for {
nodeName, nodeData, bodyData = util.ParseXmlNode(bodyData)
if nodeName == "" {
break
}
switch nodeName {
case "events":
index := strings.Index(nodeData, "")
if index != -1 {
this.eventHandler.HandleEventData(nodeData, true)
} else {
this.eventHandler.HandleEventData(nodeData, false)
}
default:
// fmt.Println("eventData:", nodeData)
}
}
}
return err
}
func (this *Session) PlaceMarketOrder(marketOrder IRequest, callback OrderCallback) (string, error) {
return this.order(marketOrder, callback, "PlaceMarketOrder")
}
func (this *Session) PlaceLimitOrder(marketOrder IRequest, callback OrderCallback) (string, error) {
return this.order(marketOrder, callback, "LimitOrder")
}
func (this *Session) PlaceStopOrder(marketOrder IRequest, callback OrderCallback) (string, error) {
return this.order(marketOrder, callback, "StopOrder")
}
func (this *Session) CancelOrder(cancelOrder IRequest, callback OrderCallback) (string, error) {
return this.order(cancelOrder, callback, "CancelOrder")
}
func (this *Session) PlaceClosingOrder(order *request.ClosingOrderRequest, callback OrderCallback) (string, error) {
return this.order(order, callback, "ClosingOrder")
}
func (this *Session) PlaceAmendStopsOrder(order *request.AmendStopsOrderRequest, callback OrderCallback) (string, error) {
return this.order(order, callback, "AmendStopsOrder")
}
func (this *Session) order(marketOrder IRequest, callback OrderCallback, op string) (string, error) {
return this.asynOrderCallback(callback, func() (string, error) {
bodyData, reqErr := this.sessionActionOkStatus(marketOrder, "POST", op)
if reqErr != nil {
return "", reqErr
}
return util.ParseXmlNameData(bodyData, []string{"res", "body", "instructionId"}), nil
})
}
func (this *Session) OrderActions(order *OrderActionsRequest, callback OrderCallback) (string, error) {
return this.order(order, callback, "OrderActions")
}
//它应该和Login 一起启动
//并且随着session的stop而停止
func (this *Session) HeartbeatTimeout(t time.Duration) {
req2 := request.NewHeartbeatSubscriptionRequest()
this.Subscribe(req2, nil)
first := true
c := time.NewTicker(t)
this.runLock.Lock()
if this.hearttick != nil {
this.hearttick.Stop()
//停止原来的heartbeat
this.heartstop <- 1
}
this.hearttick = c
this.runLock.Unlock()
go func() {
tracelog.Println("start heart beat.")
for {
select {
case <-c.C:
tracelog.Println("heart beat check begin.")
reciveTime := this.LastRecvTime()
//第一次不检查,要先让心跳发出去
tracelog.Println("heart beat offset", time.Now().Sub(reciveTime), reciveTime)
if !first && time.Now().Sub(reciveTime) > t+1*time.Second {
err := NewOpError("Stream", errors.New("Heartbeat timeout"), -1, true)
this.eventHandler.HandleEventError(err)
}
first = false
req3 := request.NewHeartbeatRequest("hello")
this.RequestHeartbeat(req3, func(str string, err error) {
tracelog.Println("RequestHeartbeat", str, err)
})
tracelog.Println("heart beat check end.")
case <-this.heartstop:
tracelog.Println("end heart beat.")
return
}
}
}()
}
func (this *Session) RequestHeartbeat(heartbeatRequest *request.HeartbeatRequest, callback HeartbeatCallback) (string, error) {
return this.asynHeartbeatCallback(callback, func() (string, error) {
bodyData, reqErr := this.sessionActionOkStatus(heartbeatRequest, "POST", "Heartbeat")
if reqErr != nil {
return "", reqErr
}
return util.ParseXmlNameData(bodyData, []string{"res", "body", "token"}), nil
})
}
func (this *Session) RequestHistoricMarketData(request IRequest, callback Callback) error {
return this.asynCallback(callback, func() error {
_, err := this.sessionActionOkStatus(request, "POST", "RequestHistorMarketData")
return err
})
}
func (this *Session) Isrunning() bool {
this.runLock.RLock()
defer this.runLock.RUnlock()
return this.running
}
func (this *Session) Stop() {
this.runLock.Lock()
defer this.runLock.Unlock()
if !this.running {
return
}
if this.hearttick != nil {
this.hearttick.Stop()
}
this.running = false
this.stopStream()
}
func (this *Session) stopStream() {
if this.stream != nil && this.stream.Body != nil {
//关闭stream 连接
this.stream.Body.Close()
}
}
//仅仅重启一下stream,不重新登录
func (this *Session) StopStream() {
this.runLock.Lock()
defer this.runLock.Unlock()
this.stopStream()
}
func (this *Session) RegisterAccountStateEvent(cb AccountStateListener) {
this.eventHandler.OnAccountStateEvent = cb
}
func (this *Session) RegisterInitEvent(cb InitListener) {
this.eventHandler.OnInit = cb
}
func (this *Session) RegisterOneExecutionEvent(cb OneExecutionListener) {
this.eventHandler.OnOneExecution = cb
}
func (this *Session) RegisterExecutionEvent(cb ExecutionListener) {
this.eventHandler.OnExecutionEvent = cb
}
func (this *Session) RegisterHeartbeatEvent(cb HeartbeatListener) {
this.eventHandler.OnHeartbeatEvent = cb
}
func (this *Session) RegisterHistoricMarketDataEvent(cb HistoricMarketDataListener) {
this.eventHandler.OnHistoricEvent = cb
}
func (this *Session) RegisterInstructionRejectedEvent(cb InstructionRejectedListener) {
this.eventHandler.OnInstructionRejected = cb
}
func (this *Session) RegisterOrderBookEvent(cb OrderBookListener) {
this.eventHandler.OnOrderBookEvent = cb
}
func (this *Session) RegisterOrderBookStatusEvent(cb OrderBookStatusListener) {
this.eventHandler.OnOrderBookStatusEvent = cb
}
func (this *Session) RegisterOrderEvent(cb OrderListener) {
this.eventHandler.OnOrderEvent = cb
}
func (this *Session) RegisterPositionEvent(cb PositionListener) {
this.eventHandler.OnPositionEvent = cb
}
func (this *Session) RegisterExchangeRateEvent(cb ExchangeRateListener) {
this.eventHandler.OnExchangeRateEvent = cb
}
func (this *Session) RegisterStreamFailureEvent(cb StreamFailureListener) {
this.eventHandler.OnStreamFailure = cb
}
func (this *Session) RegisterSessionDisconnected(cb SessionDisconnectedListener) {
this.eventHandler.OnSessionDisconnected = cb
}
func (this *Session) GetAccountDetails() *response.AccountDetails {
this.mu.Lock()
defer this.mu.Unlock()
return this.accountDetails
}
func (this *Session) KeepAlive(d time.Duration) {
req := &request.KeepAliveRequest{}
for this.Isrunning() {
_, err := this.sessionAction(req, "GET", "KeepAlive")
if err != nil {
// fmt.Println(err)
}
fmt.Println("KeepAlive sleep")
time.Sleep(d)
}
}
func (this *Session) OpenUrl(url string, callback UrlCallback) (*http.Response, error) {
return this.asynUrlCallback(callback, func() (*http.Response, error) {
this.mu.Lock()
sid := this.sessionId
this.mu.Unlock()
res, err := httpOpen(url, sid)
if err != nil {
code := 0
if res != nil {
code = res.StatusCode
}
return res, NewOpError("OpenUrl", err, code, true)
}
return res, nil
})
}
func (this *Session) sessionActionOkStatus(req IRequest, method string, op string) (string, error) {
res, reqErr := this.sessionAction(req, method, op)
if reqErr != nil {
return "", reqErr
}
defer res.Body.Close()
buf, _ := ioutil.ReadAll(res.Body)
bodyData := string(buf)
statusErr := parseOKStatusString(bodyData, op, res.StatusCode)
if statusErr != nil {
return "", statusErr
}
return bodyData, nil
}
func parseOKStatusString(data string, op string, code int) error {
// tracelog.Println("[", op, "]", data)
status := util.ParseXmlNameData(data, []string{"res", "header", "status"})
if status != "OK" {
opErr := NewOpError(op, errors.New("Error status:"+status+data), code, false)
return opErr
}
return nil
}
func (this *Session) sessionAction(req IRequest, method string, op string) (*http.Response, error) {
var res *http.Response
var err error
this.mu.Lock()
sid := this.sessionId
this.mu.Unlock()
if method == "POST" {
res, err = httpPost(this.lmax_url, sid, req)
} else {
res, err = httpGet(this.lmax_url, sid, req)
}
if err != nil {
code := 0
if res != nil {
code = res.StatusCode
}
return nil, NewOpError(op, err, code, true)
}
//这两个操作一般在初始化的时候完成,不会频繁操作,可以防止初始化的时候,因为这两个操作太慢而超时
if op == "SearchInstrument" || op == "Subscribe" {
this.UpdateRecvTime()
}
return res, nil
}