package lmaxapi import ( "errors" "fmt" "io" "io/ioutil" "tickserver/api/lmaxapi/request" "tickserver/api/lmaxapi/response" "tickserver/api/lmaxapi/util" "log" "net/http" "strings" "sync" "time" ) var tracelog = log.New(ioutil.Discard, "[trace]", log.Ltime) type Session struct { lmax_url, sessionId string running bool runLock sync.RWMutex mu sync.RWMutex hearttick *time.Ticker eventHandler *EventHandler asynCallWait sync.WaitGroup accountDetails *response.AccountDetails stream *http.Response lastRecvTime time.Time orderBook *OrderBook heartstop chan int } func EnableLog(w io.Writer) { tracelog = log.New(w, "[trace]", log.Ldate|log.Lmicroseconds) } func DisableLog() { tracelog = log.New(ioutil.Discard, "[trace]", log.Ldate|log.Lmicroseconds) } func NewSession(url string, sid string, account *response.AccountDetails) *Session { //log.Println("NewSession") s := Session{lmax_url: url, sessionId: sid, running: false, accountDetails: account} s.lastRecvTime = time.Now() s.eventHandler = NewEventHandler(&s) mtf := NewMtf() go mtf.Start() s.orderBook = NewOrderBook(mtf) s.heartstop = make(chan int) return &s } func (this *Session) Login(username string, password string, productType string) error { req := request.NewLoginRequest(username, password, productType) session, err := Login(req) if err != nil { return err } session.orderBook.Stop() this.mu.Lock() this.lmax_url = session.lmax_url this.sessionId = session.sessionId this.accountDetails = session.accountDetails this.mu.Unlock() this.UpdateRecvTime() return nil } func (this *Session) IsLogin() bool { this.mu.Lock() defer this.mu.Unlock() return this.sessionId != "" } func (this *Session) GetOrderBook() *OrderBook { return this.orderBook } func (this *Session) LoadAllInstruments(cb func(inst *response.Instrument)) ([]*response.Instrument, error) { offset := int64(0) var ret []*response.Instrument for { searchReq := request.NewSearchInstrumentRequest("", offset) insList, hasMore, err := this.SearchInstruments(searchReq, nil) if err != nil { return nil, err } if cb != nil { for _, value := range insList { cb(value) } } if !hasMore { ret = append(ret, insList...) return ret, nil } ret = append(ret, insList...) offset = insList[len(insList)-1].Id } panic("never reached.") } func (this *Session) UpdateRecvTime() { this.mu.Lock() defer this.mu.Unlock() this.lastRecvTime = time.Now() } func (this *Session) LastRecvTime() time.Time { this.mu.Lock() defer this.mu.Unlock() return this.lastRecvTime } func (this *Session) Logout(callback Callback) error { return this.asynCallback(callback, func() error { req := request.NewLogoutRequest() res, reqerr := this.sessionAction(req, "POST", "Logout") if reqerr != nil { return reqerr } defer res.Body.Close() buf, _ := ioutil.ReadAll(res.Body) bufStr := string(buf) err := parseOKStatusString(bufStr, "Logout", res.StatusCode) return err }) } func (this *Session) Subscribe(req IRequest, callback Callback) error { return this.asynCallback(callback, func() error { res, reqerr := this.sessionAction(req, "POST", "Subscribe") if reqerr != nil { return reqerr } defer res.Body.Close() buf, _ := ioutil.ReadAll(res.Body) bufStr := string(buf) err := parseOKStatusString(bufStr, "Subscribe", res.StatusCode) return err }) } func (this *Session) Unsubscribe(req IRequest, callback Callback) error { return this.asynCallback(callback, func() error { req.(UrlSetter).SetUrl("/secure/unsubscribe") res, reqerr := this.sessionAction(req, "POST", "Unsubscribe") if reqerr != nil { return reqerr } defer res.Body.Close() buf, _ := ioutil.ReadAll(res.Body) bufStr := string(buf) err := parseOKStatusString(bufStr, "Subscribe", res.StatusCode) return err }) } func (this *Session) SearchInstruments(req IRequest, searchCallback SearchInstrumentCallback) ([]*response.Instrument, bool, error) { return this.asynSearchInstrumentCallback(searchCallback, func() ([]*response.Instrument, bool, error) { res, reqErr := this.sessionAction(req, "GET", "SearchInstrument") if reqErr != nil { return nil, false, reqErr } defer res.Body.Close() buf, _ := ioutil.ReadAll(res.Body) bodyData := string(buf) statusErr := parseOKStatusString(bodyData, "SearchInstruments", res.StatusCode) if statusErr != nil { return nil, false, statusErr } insts := response.NewInstruments(bodyData) return insts.Data, insts.HasMoreResults, nil }) } func (this *Session) GetCompletedOrder(req IRequest, callback CompletedOrderCallback) ([]*response.CompletedOrder, string, error) { return this.asynGetCompletedOrder(callback, func() ([]*response.CompletedOrder, string, error) { res, reqErr := this.sessionAction(req, "GET", "CompletedOrder") if reqErr != nil { return nil, "", reqErr } defer res.Body.Close() buf, _ := ioutil.ReadAll(res.Body) bodyData := string(buf) statusErr := parseOKStatusString(bodyData, "CompletedOrder", res.StatusCode) if statusErr != nil { return nil, "", statusErr } insts, err := response.NewCompletedOrders(bodyData) if err != nil { return nil, "", err } if insts.HasMoreResults == false { return insts.Data, "", nil } offset := insts.Data[len(insts.Data)-1].OrderId return insts.Data, offset, nil }) } func (this *Session) GetAccountStatement(req IRequest, callback AccountStatementCallback) ([]*response.AccountStatement, bool, error) { return this.asynGetAccountStatement(callback, func() ([]*response.AccountStatement, bool, error) { res, reqErr := this.sessionAction(req, "GET", "AccountStatement") if reqErr != nil { return nil, false, reqErr } defer res.Body.Close() buf, _ := ioutil.ReadAll(res.Body) bodyData := string(buf) statusErr := parseOKStatusString(bodyData, "AccountStatement", res.StatusCode) if statusErr != nil { return nil, false, statusErr } insts, err := response.NewAccountStatements(bodyData) if err != nil { return nil, false, err } return insts.Data, insts.HasMoreResults, nil }) } func (this *Session) GetOrderTransaction(req IRequest, callback OrderTransactionCallback) ([]*response.OrderTransaction, bool, error) { return this.asynGetOrderTransaction(callback, func() ([]*response.OrderTransaction, bool, error) { res, reqErr := this.sessionAction(req, "GET", "OrderTransaction") if reqErr != nil { return nil, false, reqErr } defer res.Body.Close() buf, _ := ioutil.ReadAll(res.Body) bodyData := string(buf) statusErr := parseOKStatusString(bodyData, "OrderTransaction", res.StatusCode) if statusErr != nil { return nil, false, statusErr } insts, err := response.NewOrderTransactions(bodyData) if err != nil { return nil, false, err } return insts.Data, insts.HasMoreResults, nil }) } func (this *Session) GetActivity(req IRequest, callback ActivityCallback) ([]*response.Activity, bool, error) { return this.asynGetActivity(callback, func() ([]*response.Activity, bool, error) { _, ok := req.(*request.OrderActivityAuditTrailRequest) method := "GET" action := "Activity" if ok { method = "POST" action = "OrderActivityAuditTrailRequest" } res, reqErr := this.sessionAction(req, method, action) if reqErr != nil { return nil, false, reqErr } defer res.Body.Close() buf, _ := ioutil.ReadAll(res.Body) bodyData := string(buf) statusErr := parseOKStatusString(bodyData, action, res.StatusCode) if statusErr != nil { return nil, false, statusErr } insts, err := response.NewActivitys(bodyData) if err != nil { return nil, false, err } return insts.Data, insts.HasMoreResults, nil }) } //compare and set 整个操作要原子 func (this *Session) Start() error { tracelog.Println("begin start") this.runLock.Lock() if this.running { //如果已经running了 this.runLock.Unlock() return NewOpError("Stream", errors.New("Error stream status,stream is running"), 0, false) } this.running = true //running 设置成true this.runLock.Unlock() streamReq := request.NewStreamRequest() //start 停止的条件:1. 发生403错误 2. 被外界stop for this.Isrunning() { res, err := this.sessionAction(streamReq, "POST", "Stream") this.runLock.Lock() this.stream = res //这里要加锁,stream 会被stop 函数在多线程环境下使用 this.runLock.Unlock() if err != nil { this.eventHandler.HandleEventError(err) continue } //检查状态码,这个时候是session 过期了,要重新登录 if this.stream.StatusCode == 403 { this.eventHandler.HandleEventSessionDisconnected() // this.Stop() return NewOpError("Stream", errors.New("Error ssesion is out of date"), 403, true) } //处理事件 tracelog.Println("begin processStreamResponse") err = this.processStreamResponse() tracelog.Println("end processStreamResponse") if err != nil { this.eventHandler.HandleEventError(NewOpError("Stream", err, res.StatusCode, true)) } } tracelog.Println("end start") time.Sleep(time.Second) return nil } func (this *Session) processStreamResponse() error { //保证:连接会执行close res := this.stream defer res.Body.Close() buf := make([]byte, 256) //这块内存应该能够应付最大的 var bodyData, nodeName, nodeData string var n int var err error for this.Isrunning() { n, err = res.Body.Read(buf) if err != nil { break } this.UpdateRecvTime() str := string(buf[0:n]) bodyData = bodyData + str for { nodeName, nodeData, bodyData = util.ParseXmlNode(bodyData) if nodeName == "" { break } switch nodeName { case "events": index := strings.Index(nodeData, "
0
") if index != -1 { this.eventHandler.HandleEventData(nodeData, true) } else { this.eventHandler.HandleEventData(nodeData, false) } default: // fmt.Println("eventData:", nodeData) } } } return err } func (this *Session) PlaceMarketOrder(marketOrder IRequest, callback OrderCallback) (string, error) { return this.order(marketOrder, callback, "PlaceMarketOrder") } func (this *Session) PlaceLimitOrder(marketOrder IRequest, callback OrderCallback) (string, error) { return this.order(marketOrder, callback, "LimitOrder") } func (this *Session) PlaceStopOrder(marketOrder IRequest, callback OrderCallback) (string, error) { return this.order(marketOrder, callback, "StopOrder") } func (this *Session) CancelOrder(cancelOrder IRequest, callback OrderCallback) (string, error) { return this.order(cancelOrder, callback, "CancelOrder") } func (this *Session) PlaceClosingOrder(order *request.ClosingOrderRequest, callback OrderCallback) (string, error) { return this.order(order, callback, "ClosingOrder") } func (this *Session) PlaceAmendStopsOrder(order *request.AmendStopsOrderRequest, callback OrderCallback) (string, error) { return this.order(order, callback, "AmendStopsOrder") } func (this *Session) order(marketOrder IRequest, callback OrderCallback, op string) (string, error) { return this.asynOrderCallback(callback, func() (string, error) { bodyData, reqErr := this.sessionActionOkStatus(marketOrder, "POST", op) if reqErr != nil { return "", reqErr } return util.ParseXmlNameData(bodyData, []string{"res", "body", "instructionId"}), nil }) } func (this *Session) OrderActions(order *OrderActionsRequest, callback OrderCallback) (string, error) { return this.order(order, callback, "OrderActions") } //它应该和Login 一起启动 //并且随着session的stop而停止 func (this *Session) HeartbeatTimeout(t time.Duration) { req2 := request.NewHeartbeatSubscriptionRequest() this.Subscribe(req2, nil) first := true c := time.NewTicker(t) this.runLock.Lock() if this.hearttick != nil { this.hearttick.Stop() //停止原来的heartbeat this.heartstop <- 1 } this.hearttick = c this.runLock.Unlock() go func() { tracelog.Println("start heart beat.") for { select { case <-c.C: tracelog.Println("heart beat check begin.") reciveTime := this.LastRecvTime() //第一次不检查,要先让心跳发出去 tracelog.Println("heart beat offset", time.Now().Sub(reciveTime), reciveTime) if !first && time.Now().Sub(reciveTime) > t+1*time.Second { err := NewOpError("Stream", errors.New("Heartbeat timeout"), -1, true) this.eventHandler.HandleEventError(err) } first = false req3 := request.NewHeartbeatRequest("hello") this.RequestHeartbeat(req3, func(str string, err error) { tracelog.Println("RequestHeartbeat", str, err) }) tracelog.Println("heart beat check end.") case <-this.heartstop: tracelog.Println("end heart beat.") return } } }() } func (this *Session) RequestHeartbeat(heartbeatRequest *request.HeartbeatRequest, callback HeartbeatCallback) (string, error) { return this.asynHeartbeatCallback(callback, func() (string, error) { bodyData, reqErr := this.sessionActionOkStatus(heartbeatRequest, "POST", "Heartbeat") if reqErr != nil { return "", reqErr } return util.ParseXmlNameData(bodyData, []string{"res", "body", "token"}), nil }) } func (this *Session) RequestHistoricMarketData(request IRequest, callback Callback) error { return this.asynCallback(callback, func() error { _, err := this.sessionActionOkStatus(request, "POST", "RequestHistorMarketData") return err }) } func (this *Session) Isrunning() bool { this.runLock.RLock() defer this.runLock.RUnlock() return this.running } func (this *Session) Stop() { this.runLock.Lock() defer this.runLock.Unlock() if !this.running { return } if this.hearttick != nil { this.hearttick.Stop() } this.running = false this.stopStream() } func (this *Session) stopStream() { if this.stream != nil && this.stream.Body != nil { //关闭stream 连接 this.stream.Body.Close() } } //仅仅重启一下stream,不重新登录 func (this *Session) StopStream() { this.runLock.Lock() defer this.runLock.Unlock() this.stopStream() } func (this *Session) RegisterAccountStateEvent(cb AccountStateListener) { this.eventHandler.OnAccountStateEvent = cb } func (this *Session) RegisterInitEvent(cb InitListener) { this.eventHandler.OnInit = cb } func (this *Session) RegisterOneExecutionEvent(cb OneExecutionListener) { this.eventHandler.OnOneExecution = cb } func (this *Session) RegisterExecutionEvent(cb ExecutionListener) { this.eventHandler.OnExecutionEvent = cb } func (this *Session) RegisterHeartbeatEvent(cb HeartbeatListener) { this.eventHandler.OnHeartbeatEvent = cb } func (this *Session) RegisterHistoricMarketDataEvent(cb HistoricMarketDataListener) { this.eventHandler.OnHistoricEvent = cb } func (this *Session) RegisterInstructionRejectedEvent(cb InstructionRejectedListener) { this.eventHandler.OnInstructionRejected = cb } func (this *Session) RegisterOrderBookEvent(cb OrderBookListener) { this.eventHandler.OnOrderBookEvent = cb } func (this *Session) RegisterOrderBookStatusEvent(cb OrderBookStatusListener) { this.eventHandler.OnOrderBookStatusEvent = cb } func (this *Session) RegisterOrderEvent(cb OrderListener) { this.eventHandler.OnOrderEvent = cb } func (this *Session) RegisterPositionEvent(cb PositionListener) { this.eventHandler.OnPositionEvent = cb } func (this *Session) RegisterExchangeRateEvent(cb ExchangeRateListener) { this.eventHandler.OnExchangeRateEvent = cb } func (this *Session) RegisterStreamFailureEvent(cb StreamFailureListener) { this.eventHandler.OnStreamFailure = cb } func (this *Session) RegisterSessionDisconnected(cb SessionDisconnectedListener) { this.eventHandler.OnSessionDisconnected = cb } func (this *Session) GetAccountDetails() *response.AccountDetails { this.mu.Lock() defer this.mu.Unlock() return this.accountDetails } func (this *Session) KeepAlive(d time.Duration) { req := &request.KeepAliveRequest{} for this.Isrunning() { _, err := this.sessionAction(req, "GET", "KeepAlive") if err != nil { // fmt.Println(err) } fmt.Println("KeepAlive sleep") time.Sleep(d) } } func (this *Session) OpenUrl(url string, callback UrlCallback) (*http.Response, error) { return this.asynUrlCallback(callback, func() (*http.Response, error) { this.mu.Lock() sid := this.sessionId this.mu.Unlock() res, err := httpOpen(url, sid) if err != nil { code := 0 if res != nil { code = res.StatusCode } return res, NewOpError("OpenUrl", err, code, true) } return res, nil }) } func (this *Session) sessionActionOkStatus(req IRequest, method string, op string) (string, error) { res, reqErr := this.sessionAction(req, method, op) if reqErr != nil { return "", reqErr } defer res.Body.Close() buf, _ := ioutil.ReadAll(res.Body) bodyData := string(buf) statusErr := parseOKStatusString(bodyData, op, res.StatusCode) if statusErr != nil { return "", statusErr } return bodyData, nil } func parseOKStatusString(data string, op string, code int) error { // tracelog.Println("[", op, "]", data) status := util.ParseXmlNameData(data, []string{"res", "header", "status"}) if status != "OK" { opErr := NewOpError(op, errors.New("Error status:"+status+data), code, false) return opErr } return nil } func (this *Session) sessionAction(req IRequest, method string, op string) (*http.Response, error) { var res *http.Response var err error this.mu.Lock() sid := this.sessionId this.mu.Unlock() if method == "POST" { res, err = httpPost(this.lmax_url, sid, req) } else { res, err = httpGet(this.lmax_url, sid, req) } if err != nil { code := 0 if res != nil { code = res.StatusCode } return nil, NewOpError(op, err, code, true) } //这两个操作一般在初始化的时候完成,不会频繁操作,可以防止初始化的时候,因为这两个操作太慢而超时 if op == "SearchInstrument" || op == "Subscribe" { this.UpdateRecvTime() } return res, nil }