clientsimple.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617
  1. // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved.
  2. package client
  3. // 本文件为简化客户端的实现, 对Server服务调用进行封装
  4. import (
  5. "bytes"
  6. "encoding/binary"
  7. "encoding/json"
  8. "errors"
  9. "fmt"
  10. "io"
  11. "io/ioutil"
  12. "log"
  13. "math/rand"
  14. "net"
  15. "net/http"
  16. "net/url"
  17. "os"
  18. "path"
  19. "sync"
  20. "time"
  21. )
  22. // Client对Server的rpc调用进行封装
  23. // 并保存下载的历史数据, 避免重复下载
  24. // 使用数据库管理下载的数据
  25. type ClientSimple struct {
  26. clientSub net.Conn // 由于服务器行情数据较多, 为避免和其他数据冲突, 采用新的连接单独处理行情数据
  27. enc encoder
  28. dec decoder
  29. insMap map[string]*Instrument // 产品列表
  30. httpComAddr, subAddr, httpAddr string // 服务器地址
  31. mapSubs map[string]int64
  32. dir string
  33. mu sync.Mutex
  34. }
  35. var instance *ClientSimple
  36. var mu sync.Mutex
  37. func GetClientInstance(rpcAddr, rtRpcAddr, subAddr, httpAddr, dir string) (*ClientSimple, error) {
  38. mu.Lock()
  39. defer mu.Unlock()
  40. if instance != nil {
  41. return instance, nil
  42. }
  43. var err error
  44. instance, err = NewClientSimple(rpcAddr, rtRpcAddr, subAddr, httpAddr, dir)
  45. if err != nil {
  46. instance = nil
  47. }
  48. return instance, err
  49. }
  50. //简化下载流程,去掉数据库,直接判断文件是否存在来查看本地是否有cache
  51. func NewClientSimple(rpcAddr, rtRpcAddr, subAddr, httpAddr, dir string) (*ClientSimple, error) {
  52. rand.Seed(time.Now().UnixNano())
  53. c := &ClientSimple{
  54. insMap: make(map[string]*Instrument),
  55. httpComAddr: rpcAddr,
  56. subAddr: subAddr,
  57. httpAddr: httpAddr,
  58. dir: dir,
  59. mapSubs: make(map[string]int64),
  60. }
  61. err := c.connectSub(subAddr)
  62. if err != nil {
  63. return nil, err
  64. }
  65. err = c.getInsMap()
  66. if err != nil {
  67. return nil, err
  68. }
  69. go c.doInput()
  70. go c.doSubTest()
  71. return c, nil
  72. }
  73. func (c *ClientSimple) GetInsName(insId string) string {
  74. return c.insMap[insId].Name
  75. }
  76. func (c *ClientSimple) getInsMap() error {
  77. u := fmt.Sprintf("http://%s/instruments", c.httpComAddr)
  78. log.Println("beg", u)
  79. resp, err := http.Get(u)
  80. log.Println("end", u)
  81. if err != nil {
  82. return err
  83. }
  84. defer resp.Body.Close()
  85. data, err := ioutil.ReadAll(resp.Body)
  86. if err != nil {
  87. return err
  88. }
  89. json.Unmarshal(data, &c.insMap)
  90. return nil
  91. }
  92. func (c *ClientSimple) reConnectSub(err error) {
  93. c.clientSub.Close()
  94. for err != nil {
  95. log.Println("client.reConnectSub error:", err)
  96. err = c.connectSub(c.subAddr)
  97. time.Sleep(time.Second * 10)
  98. }
  99. for insId, h := range c.mapSubs {
  100. c.subMarket(insId, h)
  101. }
  102. }
  103. func (c *ClientSimple) connectSub(saddr string) error {
  104. clientSub, err := net.Dial("tcp", saddr)
  105. if err != nil {
  106. return err
  107. }
  108. enc := json.NewEncoder(clientSub)
  109. dec := json.NewDecoder(clientSub)
  110. c.clientSub = clientSub
  111. c.enc = enc
  112. c.dec = dec
  113. return nil
  114. }
  115. func (c *ClientSimple) GetInsMap() map[string]*Instrument {
  116. return c.insMap
  117. }
  118. func (c *ClientSimple) GetIns(insId string) *Instrument {
  119. return c.insMap[insId]
  120. }
  121. // downloadOne下载一个文件
  122. // 可以同时调用下载多个文件以增加下载效率
  123. // 参数done用于指明此函数调用是否完成
  124. func (c *ClientSimple) downloadOne(insId, date string, period int) (string, error) {
  125. c.mu.Lock()
  126. defer c.mu.Unlock()
  127. var u string
  128. if period == D1 {
  129. u = fmt.Sprintf("http://%s/%s/%s/D1.gz", c.httpAddr, InsIdPrefix(insId), insId)
  130. } else {
  131. var year, month, day int
  132. fmt.Sscanf(date, "%04d%02d%02d", &year, &month, &day)
  133. yearStr := fmt.Sprintf("%04d", year)
  134. periodStr := PeriodNameMap[period]
  135. if period == TK {
  136. periodStr = "tk"
  137. }
  138. u = fmt.Sprintf("http://%s/%s/%s/%s/%s.%s.gz", c.httpAddr, InsIdPrefix(insId), insId, yearStr, date, periodStr)
  139. }
  140. surl, err := url.Parse(u)
  141. if err != nil {
  142. return "", err
  143. }
  144. fname := path.Join(c.dir, surl.Path)
  145. dir := path.Dir(fname)
  146. os.MkdirAll(dir, 0777)
  147. if isExist(fname, date, period) {
  148. return fname, nil
  149. }
  150. res, err := http.Get(u)
  151. if err != nil {
  152. return "", err
  153. }
  154. defer res.Body.Close()
  155. w, err := os.Create(fname)
  156. if err != nil {
  157. return "", err
  158. }
  159. defer w.Close()
  160. _, err = io.Copy(w, res.Body)
  161. if err != nil {
  162. return "", err
  163. }
  164. return fname, nil
  165. }
  166. func isExist(fname, date string, period int) bool {
  167. if period == D1 {
  168. return false
  169. }
  170. if _, err := os.Stat(fname); os.IsNotExist(err) {
  171. return false
  172. }
  173. if period == TK {
  174. today := time.Now()
  175. todayDate := fmt.Sprintf("%04d%02d%02d", today.Year(), today.Month(), today.Day())
  176. yesterday := today.AddDate(0, 0, -1)
  177. yesterdayDate := fmt.Sprintf("%04d%02d%02d", yesterday.Year(), yesterday.Month(), yesterday.Day())
  178. if (date == todayDate) || (date == yesterdayDate) {
  179. return false
  180. } else {
  181. return true
  182. }
  183. }
  184. return true
  185. }
  186. func (c *ClientSimple) GetTickHistory(insId string, n int, ts int64) ([]Tick, error) {
  187. ticks, _, err := c.GetHistory(insId, 0, n, ts)
  188. return ticks, err
  189. }
  190. func (c *ClientSimple) GetCandleHistory(insId string, period, n int, ts int64) ([]Candle, error) {
  191. _, candles, err := c.GetHistory(insId, period, n, ts)
  192. return candles, err
  193. }
  194. func (c *ClientSimple) GetHis(insId string, period, n int, timestamp int64) (io.ReadCloser, error) {
  195. if timestamp < 0 {
  196. timestamp = time.Now().Unix() * 1000
  197. }
  198. if n == 0 {
  199. return nil, errors.New("no data needed.")
  200. }
  201. var tBegin int64
  202. if n < 0 {
  203. tBegin = 0
  204. }
  205. if n > 0 {
  206. tBegin = timestamp - 3600*24*1000
  207. }
  208. t := time.Unix(tBegin/1000, 0)
  209. periodStr := PeriodNameMap[period]
  210. if period == TK {
  211. periodStr = "tk"
  212. }
  213. u := fmt.Sprintf("http://%s/timelist?symbol=%s&period=%s&begin=%04d%02d%02d", c.httpComAddr, insId, periodStr, t.Year(), t.Month(), t.Day())
  214. resp, err := http.Get(u)
  215. if err != nil {
  216. return nil, err
  217. }
  218. defer resp.Body.Close()
  219. data, err := ioutil.ReadAll(resp.Body)
  220. if err != nil {
  221. return nil, err
  222. }
  223. var timelist []string
  224. json.Unmarshal(data, &timelist)
  225. if period == D1 {
  226. timelist = append(timelist, "xxx")
  227. }
  228. var fileIndex int
  229. if n < 0 {
  230. fileIndex = len(timelist) - 1
  231. tPoint := time.Unix(timestamp/1000+3600*24, 0)
  232. datePoint := fmt.Sprintf("%04d%02d%02d", tPoint.Year(), tPoint.Month(), tPoint.Day())
  233. for i, v := range timelist {
  234. if v >= datePoint {
  235. fileIndex = i
  236. break
  237. }
  238. }
  239. }
  240. return &HistoryResponse{
  241. clientsimple: c,
  242. timelist: timelist[:],
  243. insId: insId,
  244. period: period,
  245. n: n,
  246. ts: timestamp,
  247. fileIndex: fileIndex,
  248. buf: bytes.NewBuffer([]byte("")),
  249. }, nil
  250. }
  251. func (c *ClientSimple) GetHisEx(insId string, period int, st, et int64) (io.ReadCloser, error) {
  252. tBegin := st - 3600*24*1000
  253. t := time.Unix(tBegin/1000, 0)
  254. periodStr := PeriodNameMap[period]
  255. if period == TK {
  256. periodStr = "tk"
  257. }
  258. u := fmt.Sprintf("http://%s/timelist?symbol=%s&period=%s&begin=%04d%02d%02d", c.httpComAddr, insId, periodStr, t.Year(), t.Month(), t.Day())
  259. log.Println("down beg", u)
  260. resp, err := http.Get(u)
  261. log.Println("down end", u, err)
  262. if err != nil {
  263. return nil, err
  264. }
  265. defer resp.Body.Close()
  266. data, err := ioutil.ReadAll(resp.Body)
  267. if err != nil {
  268. return nil, err
  269. }
  270. var timelist []string
  271. json.Unmarshal(data, &timelist)
  272. if period == D1 {
  273. timelist = append(timelist, "xxx")
  274. }
  275. return &HistoryResponseEx{
  276. clientsimple: c,
  277. timelist: timelist[:],
  278. insId: insId,
  279. period: period,
  280. st: st,
  281. et: et,
  282. buf: bytes.NewBuffer([]byte("")),
  283. }, nil
  284. }
  285. //GetHistory函数封装了rpc的调用, 并对结果做相应的处理
  286. func (c *ClientSimple) GetHistory(insId string, period, nn int, ts int64) ([]Tick, []Candle, error) {
  287. if period < M1 {
  288. period = 0
  289. }
  290. resp, err := c.GetHis(insId, period, nn, ts)
  291. if err != nil {
  292. //log.Println("@@@@@", err)
  293. return nil, nil, err
  294. }
  295. if period == 0 {
  296. if nn < 0 {
  297. nn = -nn
  298. }
  299. var ticks []Tick
  300. for {
  301. var tick Tick
  302. err = binary.Read(resp, binary.LittleEndian, &tick)
  303. if err != nil {
  304. if err != io.EOF {
  305. log.Println(err)
  306. }
  307. break
  308. }
  309. ticks = append(ticks, tick)
  310. }
  311. if len(ticks) < nn {
  312. return ticks, nil, ErrNotEnough
  313. }
  314. return ticks, nil, nil
  315. }
  316. if nn < 0 {
  317. nn = -nn
  318. }
  319. var candles []Candle
  320. for {
  321. var candle Candle
  322. err = binary.Read(resp, binary.LittleEndian, &candle)
  323. if err != nil {
  324. if err != io.EOF {
  325. log.Println(err)
  326. }
  327. break
  328. }
  329. candles = append(candles, candle)
  330. }
  331. if len(candles) < nn {
  332. return nil, candles, ErrNotEnough
  333. }
  334. return nil, candles, nil
  335. }
  336. func (c *ClientSimple) GetHistoryEx(insId string, period int, st, et int64) ([]Tick, []Candle, error) {
  337. if period < M1 {
  338. period = 0
  339. }
  340. resp, err := c.GetHisEx(insId, period, st, et)
  341. if err != nil {
  342. log.Println("@@@@@", err)
  343. return nil, nil, err
  344. }
  345. if period == 0 {
  346. var ticks []Tick
  347. for {
  348. var tick Tick
  349. err = binary.Read(resp, binary.LittleEndian, &tick)
  350. if err != nil {
  351. if err != io.EOF {
  352. log.Println(err)
  353. }
  354. break
  355. }
  356. if tick.Timestamp < st {
  357. continue
  358. }
  359. if tick.Timestamp > et {
  360. break
  361. }
  362. ticks = append(ticks, tick)
  363. }
  364. return ticks, nil, nil
  365. }
  366. var candles []Candle
  367. for {
  368. var candle Candle
  369. err = binary.Read(resp, binary.LittleEndian, &candle)
  370. if err != nil {
  371. if err != io.EOF {
  372. log.Println(err)
  373. }
  374. break
  375. }
  376. if candle.Timestamp < st {
  377. continue
  378. }
  379. if candle.Timestamp > et {
  380. break
  381. }
  382. candles = append(candles, candle)
  383. }
  384. return nil, candles, nil
  385. }
  386. func (c *ClientSimple) GetLastTicks(insId string, n int) ([]Tick, error) {
  387. u := fmt.Sprintf("http://%s/cachedata?symbol=%s&period=%s", c.httpComAddr, insId, PeriodNameMap[TK])
  388. resp, err := http.Get(u)
  389. if err != nil {
  390. return nil, err
  391. }
  392. defer resp.Body.Close()
  393. data, err := ioutil.ReadAll(resp.Body)
  394. if err != nil {
  395. return nil, err
  396. }
  397. var ticks []Tick
  398. json.Unmarshal(data, &ticks)
  399. p := len(ticks) - n
  400. if p < 0 {
  401. p = 0
  402. }
  403. return ticks[p:], nil
  404. }
  405. func (c *ClientSimple) GetLastCandles(insId string, period, n int) ([]Candle, error) {
  406. u := fmt.Sprintf("http://%s/cachedata?symbol=%s&period=%s", c.httpComAddr, insId, PeriodNameMap[period])
  407. resp, err := http.Get(u)
  408. if err != nil {
  409. return nil, err
  410. }
  411. defer resp.Body.Close()
  412. data, err := ioutil.ReadAll(resp.Body)
  413. if err != nil {
  414. return nil, err
  415. }
  416. var candles []Candle
  417. json.Unmarshal(data, &candles)
  418. p := len(candles) - n
  419. if p < 0 {
  420. p = 0
  421. }
  422. return candles[p:], nil
  423. }
  424. func (c *ClientSimple) GetLastTicksByTime(insId string, st, et int64) ([]Tick, error) {
  425. u := fmt.Sprintf("http://%s/cachedata?symbol=%s&period=%s", c.httpComAddr, insId, PeriodNameMap[TK])
  426. resp, err := http.Get(u)
  427. if err != nil {
  428. return nil, err
  429. }
  430. defer resp.Body.Close()
  431. data, err := ioutil.ReadAll(resp.Body)
  432. if err != nil {
  433. return nil, err
  434. }
  435. var ticks []Tick
  436. json.Unmarshal(data, &ticks)
  437. tLen := len(ticks)
  438. if tLen <= 0 {
  439. return nil, nil
  440. }
  441. tSt := ticks[0].Timestamp
  442. tEt := ticks[tLen-1].Timestamp
  443. if st > tEt || et < tSt {
  444. return nil, nil
  445. }
  446. begin, end := -1, -1
  447. for i, v := range ticks {
  448. if v.Timestamp > et {
  449. break
  450. }
  451. if v.Timestamp >= st {
  452. if begin == -1 {
  453. begin = i
  454. } else {
  455. end = i
  456. }
  457. }
  458. }
  459. if begin != -1 && end != -1 {
  460. return ticks[begin:end], nil
  461. } else {
  462. return nil, nil
  463. }
  464. }
  465. func (c *ClientSimple) GetLastCandlesByTime(insId string, period int, st, et int64) ([]Candle, error) {
  466. u := fmt.Sprintf("http://%s/cachedata?symbol=%s&period=%s", c.httpComAddr, insId, PeriodNameMap[period])
  467. resp, err := http.Get(u)
  468. if err != nil {
  469. return nil, err
  470. }
  471. defer resp.Body.Close()
  472. data, err := ioutil.ReadAll(resp.Body)
  473. if err != nil {
  474. return nil, err
  475. }
  476. var candles []Candle
  477. json.Unmarshal(data, &candles)
  478. cLen := len(candles)
  479. if cLen <= 0 {
  480. return nil, nil
  481. }
  482. cSt := candles[0].Timestamp
  483. cEt := candles[cLen-1].Timestamp
  484. if st > cEt || et < cSt {
  485. return nil, nil
  486. }
  487. begin, end := -1, -1
  488. for i, v := range candles {
  489. if v.Timestamp > et {
  490. break
  491. }
  492. if v.Timestamp >= st {
  493. if begin == -1 {
  494. begin = i
  495. } else {
  496. end = i
  497. }
  498. }
  499. }
  500. if begin != -1 && end != -1 {
  501. return candles[begin:end], nil
  502. } else {
  503. return nil, nil
  504. }
  505. }
  506. func (c *ClientSimple) doSubTest() {
  507. t := time.Tick(time.Second * 30)
  508. for _ = range t {
  509. c.subMarket("0", 0)
  510. }
  511. }
  512. func (c *ClientSimple) doInput() {
  513. type MC struct {
  514. ch chan *Market
  515. }
  516. mcCh := make(chan *MC, 1024)
  517. for i := 0; i < 16; i++ {
  518. go func() {
  519. for {
  520. mc := <-mcCh
  521. select {
  522. case mk := <-mc.ch:
  523. ins, ok := c.insMap[mk.InsId]
  524. if !ok {
  525. break
  526. }
  527. mk.SetIns(ins)
  528. ins.SetMk(mk)
  529. //DebugDelay("doInput", mk.InsId, mk.Timestamp)
  530. default:
  531. time.Sleep(time.Microsecond)
  532. }
  533. mcCh <- mc
  534. }
  535. }()
  536. }
  537. mapCh := make(map[string]chan *Market)
  538. for {
  539. mk := &Market{}
  540. err := c.dec.Decode(mk)
  541. if err != nil {
  542. c.reConnectSub(err)
  543. continue
  544. }
  545. _, ok := mapCh[mk.InsId]
  546. if !ok {
  547. ch := make(chan *Market, 1)
  548. mcCh <- &MC{ch}
  549. mapCh[mk.InsId] = ch
  550. }
  551. select {
  552. case mapCh[mk.InsId] <- mk:
  553. default:
  554. }
  555. }
  556. }
  557. func (c *ClientSimple) subMarket(insId string, h int64) error {
  558. args := SubArgs{insId, h, false}
  559. return c.enc.Encode(args)
  560. }
  561. // 订阅指定产品的行情
  562. func (c *ClientSimple) SubMarket(insId string) {
  563. _, ok := c.mapSubs[insId]
  564. if ok {
  565. return
  566. }
  567. log.Println("Client.SubMarket", insId)
  568. h := time.Now().UnixNano() + rand.Int63()
  569. c.mapSubs[insId] = h
  570. c.subMarket(insId, h)
  571. }
  572. // 取消订阅
  573. func (c *ClientSimple) UnsubMarket(insId string) error {
  574. log.Println("Client.UnsubMarket", insId)
  575. h, ok := c.mapSubs[insId]
  576. if !ok {
  577. return errors.New("UnsubMarket error: insId is NOT in mapSubs " + insId)
  578. }
  579. delete(c.mapSubs, insId)
  580. args := SubArgs{insId, h, true}
  581. return c.enc.Encode(args)
  582. }
  583. func (c *ClientSimple) Close() {
  584. c.clientSub.Close()
  585. }