Session.go 18 KB


  1. package lmaxapi
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "tickserver/api/lmaxapi/request"
  8. "tickserver/api/lmaxapi/response"
  9. "tickserver/api/lmaxapi/util"
  10. "log"
  11. "net/http"
  12. "strings"
  13. "sync"
  14. "time"
  15. )
  16. var tracelog = log.New(ioutil.Discard, "[trace]", log.Ltime)
  17. type Session struct {
  18. lmax_url, sessionId string
  19. running bool
  20. runLock sync.RWMutex
  21. mu sync.RWMutex
  22. hearttick *time.Ticker
  23. eventHandler *EventHandler
  24. asynCallWait sync.WaitGroup
  25. accountDetails *response.AccountDetails
  26. stream *http.Response
  27. lastRecvTime time.Time
  28. orderBook *OrderBook
  29. heartstop chan int
  30. }
  31. func EnableLog(w io.Writer) {
  32. tracelog = log.New(w, "[trace]", log.Ldate|log.Lmicroseconds)
  33. }
  34. func DisableLog() {
  35. tracelog = log.New(ioutil.Discard, "[trace]", log.Ldate|log.Lmicroseconds)
  36. }
  37. func NewSession(url string, sid string, account *response.AccountDetails) *Session {
  38. //log.Println("NewSession")
  39. s := Session{lmax_url: url, sessionId: sid, running: false, accountDetails: account}
  40. s.lastRecvTime = time.Now()
  41. s.eventHandler = NewEventHandler(&s)
  42. mtf := NewMtf()
  43. go mtf.Start()
  44. s.orderBook = NewOrderBook(mtf)
  45. s.heartstop = make(chan int)
  46. return &s
  47. }
  48. func (this *Session) Login(username string, password string, productType string) error {
  49. req := request.NewLoginRequest(username, password, productType)
  50. session, err := Login(req)
  51. if err != nil {
  52. return err
  53. }
  54. session.orderBook.Stop()
  55. this.mu.Lock()
  56. this.lmax_url = session.lmax_url
  57. this.sessionId = session.sessionId
  58. this.accountDetails = session.accountDetails
  59. this.mu.Unlock()
  60. this.UpdateRecvTime()
  61. return nil
  62. }
  63. func (this *Session) IsLogin() bool {
  64. this.mu.Lock()
  65. defer this.mu.Unlock()
  66. return this.sessionId != ""
  67. }
  68. func (this *Session) GetOrderBook() *OrderBook {
  69. return this.orderBook
  70. }
  71. func (this *Session) LoadAllInstruments(cb func(inst *response.Instrument)) ([]*response.Instrument, error) {
  72. offset := int64(0)
  73. var ret []*response.Instrument
  74. for {
  75. searchReq := request.NewSearchInstrumentRequest("", offset)
  76. insList, hasMore, err := this.SearchInstruments(searchReq, nil)
  77. if err != nil {
  78. return nil, err
  79. }
  80. if cb != nil {
  81. for _, value := range insList {
  82. cb(value)
  83. }
  84. }
  85. if !hasMore {
  86. ret = append(ret, insList...)
  87. return ret, nil
  88. }
  89. ret = append(ret, insList...)
  90. offset = insList[len(insList)-1].Id
  91. }
  92. panic("never reached.")
  93. }
  94. func (this *Session) UpdateRecvTime() {
  95. this.mu.Lock()
  96. defer this.mu.Unlock()
  97. this.lastRecvTime = time.Now()
  98. }
  99. func (this *Session) LastRecvTime() time.Time {
  100. this.mu.Lock()
  101. defer this.mu.Unlock()
  102. return this.lastRecvTime
  103. }
  104. func (this *Session) Logout(callback Callback) error {
  105. return this.asynCallback(callback, func() error {
  106. req := request.NewLogoutRequest()
  107. res, reqerr := this.sessionAction(req, "POST", "Logout")
  108. if reqerr != nil {
  109. return reqerr
  110. }
  111. defer res.Body.Close()
  112. buf, _ := ioutil.ReadAll(res.Body)
  113. bufStr := string(buf)
  114. err := parseOKStatusString(bufStr, "Logout", res.StatusCode)
  115. return err
  116. })
  117. }
  118. func (this *Session) Subscribe(req IRequest, callback Callback) error {
  119. return this.asynCallback(callback, func() error {
  120. res, reqerr := this.sessionAction(req, "POST", "Subscribe")
  121. if reqerr != nil {
  122. return reqerr
  123. }
  124. defer res.Body.Close()
  125. buf, _ := ioutil.ReadAll(res.Body)
  126. bufStr := string(buf)
  127. err := parseOKStatusString(bufStr, "Subscribe", res.StatusCode)
  128. return err
  129. })
  130. }
  131. func (this *Session) Unsubscribe(req IRequest, callback Callback) error {
  132. return this.asynCallback(callback, func() error {
  133. req.(UrlSetter).SetUrl("/secure/unsubscribe")
  134. res, reqerr := this.sessionAction(req, "POST", "Unsubscribe")
  135. if reqerr != nil {
  136. return reqerr
  137. }
  138. defer res.Body.Close()
  139. buf, _ := ioutil.ReadAll(res.Body)
  140. bufStr := string(buf)
  141. err := parseOKStatusString(bufStr, "Subscribe", res.StatusCode)
  142. return err
  143. })
  144. }
  145. func (this *Session) SearchInstruments(req IRequest, searchCallback SearchInstrumentCallback) ([]*response.Instrument, bool, error) {
  146. return this.asynSearchInstrumentCallback(searchCallback, func() ([]*response.Instrument, bool, error) {
  147. res, reqErr := this.sessionAction(req, "GET", "SearchInstrument")
  148. if reqErr != nil {
  149. return nil, false, reqErr
  150. }
  151. defer res.Body.Close()
  152. buf, _ := ioutil.ReadAll(res.Body)
  153. bodyData := string(buf)
  154. statusErr := parseOKStatusString(bodyData, "SearchInstruments", res.StatusCode)
  155. if statusErr != nil {
  156. return nil, false, statusErr
  157. }
  158. insts := response.NewInstruments(bodyData)
  159. return insts.Data, insts.HasMoreResults, nil
  160. })
  161. }
  162. func (this *Session) GetCompletedOrder(req IRequest, callback CompletedOrderCallback) ([]*response.CompletedOrder, string, error) {
  163. return this.asynGetCompletedOrder(callback, func() ([]*response.CompletedOrder, string, error) {
  164. res, reqErr := this.sessionAction(req, "GET", "CompletedOrder")
  165. if reqErr != nil {
  166. return nil, "", reqErr
  167. }
  168. defer res.Body.Close()
  169. buf, _ := ioutil.ReadAll(res.Body)
  170. bodyData := string(buf)
  171. statusErr := parseOKStatusString(bodyData, "CompletedOrder", res.StatusCode)
  172. if statusErr != nil {
  173. return nil, "", statusErr
  174. }
  175. insts, err := response.NewCompletedOrders(bodyData)
  176. if err != nil {
  177. return nil, "", err
  178. }
  179. if insts.HasMoreResults == false {
  180. return insts.Data, "", nil
  181. }
  182. offset := insts.Data[len(insts.Data)-1].OrderId
  183. return insts.Data, offset, nil
  184. })
  185. }
  186. func (this *Session) GetAccountStatement(req IRequest, callback AccountStatementCallback) ([]*response.AccountStatement, bool, error) {
  187. return this.asynGetAccountStatement(callback, func() ([]*response.AccountStatement, bool, error) {
  188. res, reqErr := this.sessionAction(req, "GET", "AccountStatement")
  189. if reqErr != nil {
  190. return nil, false, reqErr
  191. }
  192. defer res.Body.Close()
  193. buf, _ := ioutil.ReadAll(res.Body)
  194. bodyData := string(buf)
  195. statusErr := parseOKStatusString(bodyData, "AccountStatement", res.StatusCode)
  196. if statusErr != nil {
  197. return nil, false, statusErr
  198. }
  199. insts, err := response.NewAccountStatements(bodyData)
  200. if err != nil {
  201. return nil, false, err
  202. }
  203. return insts.Data, insts.HasMoreResults, nil
  204. })
  205. }
  206. func (this *Session) GetOrderTransaction(req IRequest, callback OrderTransactionCallback) ([]*response.OrderTransaction, bool, error) {
  207. return this.asynGetOrderTransaction(callback, func() ([]*response.OrderTransaction, bool, error) {
  208. res, reqErr := this.sessionAction(req, "GET", "OrderTransaction")
  209. if reqErr != nil {
  210. return nil, false, reqErr
  211. }
  212. defer res.Body.Close()
  213. buf, _ := ioutil.ReadAll(res.Body)
  214. bodyData := string(buf)
  215. statusErr := parseOKStatusString(bodyData, "OrderTransaction", res.StatusCode)
  216. if statusErr != nil {
  217. return nil, false, statusErr
  218. }
  219. insts, err := response.NewOrderTransactions(bodyData)
  220. if err != nil {
  221. return nil, false, err
  222. }
  223. return insts.Data, insts.HasMoreResults, nil
  224. })
  225. }
  226. func (this *Session) GetActivity(req IRequest, callback ActivityCallback) ([]*response.Activity, bool, error) {
  227. return this.asynGetActivity(callback, func() ([]*response.Activity, bool, error) {
  228. _, ok := req.(*request.OrderActivityAuditTrailRequest)
  229. method := "GET"
  230. action := "Activity"
  231. if ok {
  232. method = "POST"
  233. action = "OrderActivityAuditTrailRequest"
  234. }
  235. res, reqErr := this.sessionAction(req, method, action)
  236. if reqErr != nil {
  237. return nil, false, reqErr
  238. }
  239. defer res.Body.Close()
  240. buf, _ := ioutil.ReadAll(res.Body)
  241. bodyData := string(buf)
  242. statusErr := parseOKStatusString(bodyData, action, res.StatusCode)
  243. if statusErr != nil {
  244. return nil, false, statusErr
  245. }
  246. insts, err := response.NewActivitys(bodyData)
  247. if err != nil {
  248. return nil, false, err
  249. }
  250. return insts.Data, insts.HasMoreResults, nil
  251. })
  252. }
  253. //compare and set 整个操作要原子
  254. func (this *Session) Start() error {
  255. tracelog.Println("begin start")
  256. this.runLock.Lock()
  257. if this.running { //如果已经running了
  258. this.runLock.Unlock()
  259. return NewOpError("Stream", errors.New("Error stream status,stream is running"), 0, false)
  260. }
  261. this.running = true //running 设置成true
  262. this.runLock.Unlock()
  263. streamReq := request.NewStreamRequest()
  264. //start 停止的条件:1. 发生403错误 2. 被外界stop
  265. for this.Isrunning() {
  266. res, err := this.sessionAction(streamReq, "POST", "Stream")
  267. this.runLock.Lock()
  268. this.stream = res //这里要加锁,stream 会被stop 函数在多线程环境下使用
  269. this.runLock.Unlock()
  270. if err != nil {
  271. this.eventHandler.HandleEventError(err)
  272. continue
  273. }
  274. //检查状态码,这个时候是session 过期了,要重新登录
  275. if this.stream.StatusCode == 403 {
  276. this.eventHandler.HandleEventSessionDisconnected()
  277. // this.Stop()
  278. return NewOpError("Stream", errors.New("Error ssesion is out of date"), 403, true)
  279. }
  280. //处理事件
  281. tracelog.Println("begin processStreamResponse")
  282. err = this.processStreamResponse()
  283. tracelog.Println("end processStreamResponse")
  284. if err != nil {
  285. this.eventHandler.HandleEventError(NewOpError("Stream", err, res.StatusCode, true))
  286. }
  287. }
  288. tracelog.Println("end start")
  289. time.Sleep(time.Second)
  290. return nil
  291. }
  292. func (this *Session) processStreamResponse() error {
  293. //保证:连接会执行close
  294. res := this.stream
  295. defer res.Body.Close()
  296. buf := make([]byte, 256) //这块内存应该能够应付最大的<events></events>
  297. var bodyData, nodeName, nodeData string
  298. var n int
  299. var err error
  300. for this.Isrunning() {
  301. n, err = res.Body.Read(buf)
  302. if err != nil {
  303. break
  304. }
  305. this.UpdateRecvTime()
  306. str := string(buf[0:n])
  307. bodyData = bodyData + str
  308. for {
  309. nodeName, nodeData, bodyData = util.ParseXmlNode(bodyData)
  310. if nodeName == "" {
  311. break
  312. }
  313. switch nodeName {
  314. case "events":
  315. index := strings.Index(nodeData, "<header><seq>0</seq></header>")
  316. if index != -1 {
  317. this.eventHandler.HandleEventData(nodeData, true)
  318. } else {
  319. this.eventHandler.HandleEventData(nodeData, false)
  320. }
  321. default:
  322. // fmt.Println("eventData:", nodeData)
  323. }
  324. }
  325. }
  326. return err
  327. }
  328. func (this *Session) PlaceMarketOrder(marketOrder IRequest, callback OrderCallback) (string, error) {
  329. return this.order(marketOrder, callback, "PlaceMarketOrder")
  330. }
  331. func (this *Session) PlaceLimitOrder(marketOrder IRequest, callback OrderCallback) (string, error) {
  332. return this.order(marketOrder, callback, "LimitOrder")
  333. }
  334. func (this *Session) PlaceStopOrder(marketOrder IRequest, callback OrderCallback) (string, error) {
  335. return this.order(marketOrder, callback, "StopOrder")
  336. }
  337. func (this *Session) CancelOrder(cancelOrder IRequest, callback OrderCallback) (string, error) {
  338. return this.order(cancelOrder, callback, "CancelOrder")
  339. }
  340. func (this *Session) PlaceClosingOrder(order *request.ClosingOrderRequest, callback OrderCallback) (string, error) {
  341. return this.order(order, callback, "ClosingOrder")
  342. }
  343. func (this *Session) PlaceAmendStopsOrder(order *request.AmendStopsOrderRequest, callback OrderCallback) (string, error) {
  344. return this.order(order, callback, "AmendStopsOrder")
  345. }
  346. func (this *Session) order(marketOrder IRequest, callback OrderCallback, op string) (string, error) {
  347. return this.asynOrderCallback(callback, func() (string, error) {
  348. bodyData, reqErr := this.sessionActionOkStatus(marketOrder, "POST", op)
  349. if reqErr != nil {
  350. return "", reqErr
  351. }
  352. return util.ParseXmlNameData(bodyData, []string{"res", "body", "instructionId"}), nil
  353. })
  354. }
  355. func (this *Session) OrderActions(order *OrderActionsRequest, callback OrderCallback) (string, error) {
  356. return this.order(order, callback, "OrderActions")
  357. }
  358. //它应该和Login 一起启动
  359. //并且随着session的stop而停止
  360. func (this *Session) HeartbeatTimeout(t time.Duration) {
  361. req2 := request.NewHeartbeatSubscriptionRequest()
  362. this.Subscribe(req2, nil)
  363. first := true
  364. c := time.NewTicker(t)
  365. this.runLock.Lock()
  366. if this.hearttick != nil {
  367. this.hearttick.Stop()
  368. //停止原来的heartbeat
  369. this.heartstop <- 1
  370. }
  371. this.hearttick = c
  372. this.runLock.Unlock()
  373. go func() {
  374. tracelog.Println("start heart beat.")
  375. for {
  376. select {
  377. case <-c.C:
  378. tracelog.Println("heart beat check begin.")
  379. reciveTime := this.LastRecvTime()
  380. //第一次不检查,要先让心跳发出去
  381. tracelog.Println("heart beat offset", time.Now().Sub(reciveTime), reciveTime)
  382. if !first && time.Now().Sub(reciveTime) > t+1*time.Second {
  383. err := NewOpError("Stream", errors.New("Heartbeat timeout"), -1, true)
  384. this.eventHandler.HandleEventError(err)
  385. }
  386. first = false
  387. req3 := request.NewHeartbeatRequest("hello")
  388. this.RequestHeartbeat(req3, func(str string, err error) {
  389. tracelog.Println("RequestHeartbeat", str, err)
  390. })
  391. tracelog.Println("heart beat check end.")
  392. case <-this.heartstop:
  393. tracelog.Println("end heart beat.")
  394. return
  395. }
  396. }
  397. }()
  398. }
  399. func (this *Session) RequestHeartbeat(heartbeatRequest *request.HeartbeatRequest, callback HeartbeatCallback) (string, error) {
  400. return this.asynHeartbeatCallback(callback, func() (string, error) {
  401. bodyData, reqErr := this.sessionActionOkStatus(heartbeatRequest, "POST", "Heartbeat")
  402. if reqErr != nil {
  403. return "", reqErr
  404. }
  405. return util.ParseXmlNameData(bodyData, []string{"res", "body", "token"}), nil
  406. })
  407. }
  408. func (this *Session) RequestHistoricMarketData(request IRequest, callback Callback) error {
  409. return this.asynCallback(callback, func() error {
  410. _, err := this.sessionActionOkStatus(request, "POST", "RequestHistorMarketData")
  411. return err
  412. })
  413. }
  414. func (this *Session) Isrunning() bool {
  415. this.runLock.RLock()
  416. defer this.runLock.RUnlock()
  417. return this.running
  418. }
  419. func (this *Session) Stop() {
  420. this.runLock.Lock()
  421. defer this.runLock.Unlock()
  422. if !this.running {
  423. return
  424. }
  425. if this.hearttick != nil {
  426. this.hearttick.Stop()
  427. }
  428. this.running = false
  429. this.stopStream()
  430. }
  431. func (this *Session) stopStream() {
  432. if this.stream != nil && this.stream.Body != nil {
  433. //关闭stream 连接
  434. this.stream.Body.Close()
  435. }
  436. }
  437. //仅仅重启一下stream,不重新登录
  438. func (this *Session) StopStream() {
  439. this.runLock.Lock()
  440. defer this.runLock.Unlock()
  441. this.stopStream()
  442. }
  443. func (this *Session) RegisterAccountStateEvent(cb AccountStateListener) {
  444. this.eventHandler.OnAccountStateEvent = cb
  445. }
  446. func (this *Session) RegisterInitEvent(cb InitListener) {
  447. this.eventHandler.OnInit = cb
  448. }
  449. func (this *Session) RegisterOneExecutionEvent(cb OneExecutionListener) {
  450. this.eventHandler.OnOneExecution = cb
  451. }
  452. func (this *Session) RegisterExecutionEvent(cb ExecutionListener) {
  453. this.eventHandler.OnExecutionEvent = cb
  454. }
  455. func (this *Session) RegisterHeartbeatEvent(cb HeartbeatListener) {
  456. this.eventHandler.OnHeartbeatEvent = cb
  457. }
  458. func (this *Session) RegisterHistoricMarketDataEvent(cb HistoricMarketDataListener) {
  459. this.eventHandler.OnHistoricEvent = cb
  460. }
  461. func (this *Session) RegisterInstructionRejectedEvent(cb InstructionRejectedListener) {
  462. this.eventHandler.OnInstructionRejected = cb
  463. }
  464. func (this *Session) RegisterOrderBookEvent(cb OrderBookListener) {
  465. this.eventHandler.OnOrderBookEvent = cb
  466. }
  467. func (this *Session) RegisterOrderBookStatusEvent(cb OrderBookStatusListener) {
  468. this.eventHandler.OnOrderBookStatusEvent = cb
  469. }
  470. func (this *Session) RegisterOrderEvent(cb OrderListener) {
  471. this.eventHandler.OnOrderEvent = cb
  472. }
  473. func (this *Session) RegisterPositionEvent(cb PositionListener) {
  474. this.eventHandler.OnPositionEvent = cb
  475. }
  476. func (this *Session) RegisterExchangeRateEvent(cb ExchangeRateListener) {
  477. this.eventHandler.OnExchangeRateEvent = cb
  478. }
  479. func (this *Session) RegisterStreamFailureEvent(cb StreamFailureListener) {
  480. this.eventHandler.OnStreamFailure = cb
  481. }
  482. func (this *Session) RegisterSessionDisconnected(cb SessionDisconnectedListener) {
  483. this.eventHandler.OnSessionDisconnected = cb
  484. }
  485. func (this *Session) GetAccountDetails() *response.AccountDetails {
  486. this.mu.Lock()
  487. defer this.mu.Unlock()
  488. return this.accountDetails
  489. }
  490. func (this *Session) KeepAlive(d time.Duration) {
  491. req := &request.KeepAliveRequest{}
  492. for this.Isrunning() {
  493. _, err := this.sessionAction(req, "GET", "KeepAlive")
  494. if err != nil {
  495. // fmt.Println(err)
  496. }
  497. fmt.Println("KeepAlive sleep")
  498. time.Sleep(d)
  499. }
  500. }
  501. func (this *Session) OpenUrl(url string, callback UrlCallback) (*http.Response, error) {
  502. return this.asynUrlCallback(callback, func() (*http.Response, error) {
  503. this.mu.Lock()
  504. sid := this.sessionId
  505. this.mu.Unlock()
  506. res, err := httpOpen(url, sid)
  507. if err != nil {
  508. code := 0
  509. if res != nil {
  510. code = res.StatusCode
  511. }
  512. return res, NewOpError("OpenUrl", err, code, true)
  513. }
  514. return res, nil
  515. })
  516. }
  517. func (this *Session) sessionActionOkStatus(req IRequest, method string, op string) (string, error) {
  518. res, reqErr := this.sessionAction(req, method, op)
  519. if reqErr != nil {
  520. return "", reqErr
  521. }
  522. defer res.Body.Close()
  523. buf, _ := ioutil.ReadAll(res.Body)
  524. bodyData := string(buf)
  525. statusErr := parseOKStatusString(bodyData, op, res.StatusCode)
  526. if statusErr != nil {
  527. return "", statusErr
  528. }
  529. return bodyData, nil
  530. }
  531. func parseOKStatusString(data string, op string, code int) error {
  532. // tracelog.Println("[", op, "]", data)
  533. status := util.ParseXmlNameData(data, []string{"res", "header", "status"})
  534. if status != "OK" {
  535. opErr := NewOpError(op, errors.New("Error status:"+status+data), code, false)
  536. return opErr
  537. }
  538. return nil
  539. }
  540. func (this *Session) sessionAction(req IRequest, method string, op string) (*http.Response, error) {
  541. var res *http.Response
  542. var err error
  543. this.mu.Lock()
  544. sid := this.sessionId
  545. this.mu.Unlock()
  546. if method == "POST" {
  547. res, err = httpPost(this.lmax_url, sid, req)
  548. } else {
  549. res, err = httpGet(this.lmax_url, sid, req)
  550. }
  551. if err != nil {
  552. code := 0
  553. if res != nil {
  554. code = res.StatusCode
  555. }
  556. return nil, NewOpError(op, err, code, true)
  557. }
  558. //这两个操作一般在初始化的时候完成,不会频繁操作,可以防止初始化的时候,因为这两个操作太慢而超时
  559. if op == "SearchInstrument" || op == "Subscribe" {
  560. this.UpdateRecvTime()
  561. }
  562. return res, nil
  563. }