general.go 14 KB


  1. // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved.
  2. package tick
  3. // 本文件实现general数据源接口, 实时数据和历史数据的获取和保存
  4. import (
  5. "bytes"
  6. "encoding/json"
  7. "errors"
  8. "fmt"
  9. "io"
  10. "io/ioutil"
  11. //"log"
  12. "net/http"
  13. "path"
  14. "strconv"
  15. "strings"
  16. "time"
  17. "tickserver/framework/event"
  18. "tickserver/markinfo"
  19. "tickserver/server/market"
  20. "github.com/niniwzw/http2"
  21. )
  22. var tr = &http2.Transport{
  23. InsecureTLSDial: true,
  24. Timeout: 2000 * time.Millisecond,
  25. }
  26. // GeneralDS实现了dataSource接口, 并对general的历史数据和实时数据保存
  27. type GeneralDS struct {
  28. *market.DSBase
  29. cm *CandleMaker
  30. client *http.Client
  31. insPublisher event.EventPublisher
  32. insMap map[string]*market.Instrument
  33. idMappingMap map[int64]string
  34. typ string
  35. typId int
  36. url string
  37. st int64
  38. curId int64
  39. }
  40. type JsonResp2 struct {
  41. Result *json.RawMessage `json:"result"`
  42. Err string `json:"err"`
  43. Code int `json:"code"`
  44. }
  45. func NewGeneralDS(typ, url, fileserver, dataDir string, db *market.MyDB, bSSL bool) (*GeneralDS, error) {
  46. dir := dataDir + "/" + typ
  47. //now := time.Now()
  48. //st := time.Date(now.Year(), now.Month(), now.Day(), 6, 0, 0, 0, time.Local).Unix() * 1000
  49. typId := TypeId(typ)
  50. var client *http.Client
  51. if bSSL {
  52. client = &http.Client{Transport: tr}
  53. } else {
  54. client = &http.Client{}
  55. }
  56. gds := &GeneralDS{
  57. DSBase: market.NewDsBase(db, dir),
  58. insMap: make(map[string]*market.Instrument),
  59. idMappingMap: make(map[int64]string),
  60. typ: typ,
  61. typId: typId,
  62. url: url,
  63. //st: st,
  64. client: client,
  65. cm: &CandleMaker{
  66. typ: typ,
  67. typId: typId,
  68. url: url,
  69. dataDir: dataDir,
  70. fileserver: fileserver,
  71. db: db,
  72. client: client,
  73. },
  74. }
  75. gds.cm.gds = gds
  76. return gds, nil
  77. }
  78. func (gds *GeneralDS) SubIns() *event.Event {
  79. return gds.insPublisher.Event()
  80. }
  81. func (gds *GeneralDS) Run() {
  82. //log.Println("GeneralDS.Run, type:", gds.typ)
  83. /*goroutineNum := 64
  84. switch gds.typId {
  85. case IntLmax:
  86. goroutineNum = 8
  87. case IntEasyForex:
  88. goroutineNum = 4
  89. case IntCtp:
  90. goroutineNum = 32
  91. case IntDzh:
  92. goroutineNum = 64
  93. case IntTdx:
  94. goroutineNum = 512
  95. }*/
  96. go gds.DoReadEx()
  97. go gds.cm.run()
  98. //log.Println("getInss begin", gds.typ)
  99. gds.getInss()
  100. //log.Println("getInss end", gds.typ)
  101. //log.Println("load begin", gds.typ)
  102. gds.load()
  103. //log.Println("load end", gds.typ)
  104. //gds.download()
  105. //log.Println("stream begin", gds.typ)
  106. gds.stream()
  107. //log.Println("stream end", gds.typ)
  108. }
  109. func (gds *GeneralDS) load() {
  110. year := fmt.Sprintf("%d", time.Now().Year())
  111. for i, v := range gds.insMap {
  112. dir := path.Join(gds.cm.dataDir, gds.typ, i, year)
  113. //log.Println(dir)
  114. infos, err := ioutil.ReadDir(dir)
  115. if err != nil {
  116. //log.Println(err)
  117. continue
  118. }
  119. var curname string
  120. for j := 0; j < len(infos); j++ {
  121. if strings.HasSuffix(infos[j].Name(), ".tk.gz") {
  122. curname = infos[j].Name()
  123. }
  124. }
  125. tickfile := path.Join(dir, curname)
  126. ticks, _ := market.ReadTickFile(tickfile)
  127. //log.Println(tickfile, len(ticks))
  128. for _, tick := range ticks {
  129. var mk market.Market
  130. mk.InsId = v.Id
  131. mk.Timestamp = tick.Timestamp
  132. mk.Close = tick.Price
  133. mk.Low = tick.Price
  134. mk.High = tick.Price
  135. mk.Open = tick.Price
  136. mk.AllAmount = tick.Volume
  137. mk.AllVolume = tick.Volume
  138. mk.LastPrice = tick.Price
  139. mk.Volume = tick.Volume
  140. mk.Bids = make([]market.PP, 1)
  141. mk.Bids = append(mk.Bids, tick.Bid)
  142. mk.Asks = make([]market.PP, 1)
  143. mk.Asks = append(mk.Asks, tick.Ask)
  144. //time.Sleep(time.Millisecond)
  145. gds.SaveL(&mk)
  146. }
  147. }
  148. }
  149. func (gds *GeneralDS) getIns(id int64) (Instrument, error) {
  150. var ins Instrument
  151. req := &InstrumentRequest{Type: gds.typ, Id: id}
  152. body, err := httpReq(gds.client, "instrument", gds.url, req)
  153. if err != nil {
  154. //log.Println("httpreq", err)
  155. return ins, err
  156. }
  157. _, err = decodeResp(body, &ins)
  158. if err != nil {
  159. //log.Println("decodeResponse", err)
  160. return ins, err
  161. }
  162. return ins, nil
  163. }
  164. func (gds *GeneralDS) getInss() {
  165. for {
  166. req := &InstrumentsRequest{Type: gds.typ}
  167. body, err := httpReq(gds.client, "instruments", gds.url, req)
  168. if err != nil {
  169. //log.Println("httpreq", err)
  170. time.Sleep(10 * time.Second)
  171. continue
  172. }
  173. var inss []Instrument
  174. _, err = decodeResp(body, &inss)
  175. if err != nil {
  176. //log.Println("decodeResponse", err)
  177. return
  178. }
  179. for _, v := range inss {
  180. ins := tkIns2mkIns(gds.typ, v)
  181. //if tk.Type == IntSina {
  182. //log.Println("sssss", *ins)
  183. //}
  184. gds.idMappingMap[v.Id] = ins.Id
  185. gds.insMap[ins.Id] = ins
  186. gds.insPublisher.Publish(ins)
  187. }
  188. break
  189. }
  190. }
  191. func tkIns2mkIns(typ string, tIns Instrument) *market.Instrument {
  192. var insId string
  193. if typ == DataTypeName(IntCtp) {
  194. insId = typ + "_"
  195. intTyp := tIns.Id / 10000
  196. insTyp, _ := ctpTyps[int(intTyp)]
  197. insId += insTyp
  198. intSuffix := tIns.Id % 10000
  199. insId += strconv.Itoa(int(intSuffix))
  200. } else if typ == DataTypeName(IntEasyForex) || typ == DataTypeName(IntOanda) || typ == DataTypeName(IntBtc) || typ == DataTypeName(IntPolo) || typ == DataTypeName(IntBty) || typ == DataTypeName(IntCFix) || typ == DataTypeName(IntHuobi) || typ == DataTypeName(IntYunbi) || typ == DataTypeName(IntChbtc) {
  201. idStr, _ := markinfo.SymbolName(int(tIns.Id))
  202. insId = typ + "_" + idStr
  203. } else {
  204. if typ == Sina || typ == Dzh || typ == Tdx {
  205. insId = typ + "_" + fmt.Sprintf("%06d", tIns.Id)
  206. } else {
  207. insId = typ + "_" + fmt.Sprintf("%d", tIns.Id)
  208. }
  209. }
  210. ins := &market.Instrument{
  211. Id: insId,
  212. Name: tIns.Name,
  213. Typ: tIns.Type,
  214. ExId: tIns.ExId,
  215. PriceInc: tIns.PriceInc,
  216. Margin: tIns.Margin,
  217. StartTime: tIns.StartTime,
  218. EndTime: tIns.EndTime,
  219. }
  220. return ins
  221. }
  222. func (gds *GeneralDS) download() {
  223. var offset int
  224. num := 1000
  225. downstart := gds.st
  226. downid := gds.curId
  227. for num >= 1000 {
  228. req := &DownloadRequest{Type: gds.typ, Start: downstart, End: downid, Offset: offset, Count: 1000, OrderBy: "time asc"}
  229. //log.Println("download", req)
  230. body, err := httpReq(gds.client, "download", gds.url, req)
  231. if err != nil {
  232. //log.Println("httpReq", err)
  233. return
  234. }
  235. var ticks []*Market
  236. _, err = decodeResp(body, &ticks)
  237. if err != nil {
  238. //log.Println("decodeResp", err)
  239. return
  240. }
  241. //log.Println("download num:", len(ticks))
  242. for _, v := range ticks {
  243. if v.Type != int32(gds.typId) {
  244. //log.Println("download wrongggggggggg typ", v.Type, gds.typId)
  245. continue
  246. }
  247. //gds.st = v.Timestamp
  248. //gds.curId = v.InsId
  249. ins := gds.addIns(v.InsId)
  250. if nil == ins {
  251. continue
  252. }
  253. mk := tMk2mMk(*v, ins)
  254. //ins.SetMk(&mk)
  255. gds.Save(&mk)
  256. //printDelay(DataTypeName(int(v.Type)), "eeeee"+fmt.Sprint(v.InsId), v.Timestamp)
  257. }
  258. //log.Println(ticks)
  259. num = len(ticks)
  260. offset += num
  261. }
  262. }
  263. func (gds *GeneralDS) stream() {
  264. for {
  265. req := &StreamRequest{Type: gds.typ}
  266. //log.Println("streamReq begin", gds.typ)
  267. body, err := streamReq(gds.client, "stream", gds.url, req)
  268. if err != nil {
  269. //log.Println("streamReq", err)
  270. time.Sleep(10 * time.Second)
  271. continue
  272. }
  273. //log.Println("streamReq end", gds.typ)
  274. decoder := json.NewDecoder(body)
  275. var tick Market
  276. for {
  277. //fmt.Printf("[O]")
  278. err = decoder.Decode(&tick)
  279. //fmt.Printf("[C]")
  280. if err != nil {
  281. body.Close()
  282. //log.Println("connect retry...", err)
  283. time.Sleep(time.Second * 1)
  284. break
  285. }
  286. if tick.Type != int32(gds.typId) {
  287. //log.Println("stream wrongggggggggg typ", tick.Type, gds.typId)
  288. continue
  289. }
  290. gds.st = tick.Timestamp
  291. gds.curId = tick.InsId
  292. ins := gds.addIns(tick.InsId)
  293. if nil == ins {
  294. continue
  295. }
  296. //if gds.typId == IntTdx && tick.InsId == 1 {
  297. //log.Println("[stream]data trace")
  298. //}
  299. mk := tMk2mMk(tick, ins)
  300. ins.SetMk(&mk)
  301. gds.Save(&mk)
  302. }
  303. }
  304. }
  305. func decodeResp(datain []byte, dataout interface{}) (int, error) {
  306. var resp JsonResp2
  307. err := json.Unmarshal(datain, &resp)
  308. if err != nil {
  309. return -1, err
  310. }
  311. if resp.Err != "" {
  312. return -1, errors.New(resp.Err)
  313. }
  314. if resp.Result == nil {
  315. return 0, nil
  316. }
  317. json.Unmarshal(*resp.Result, &dataout)
  318. return 0, nil
  319. }
  320. func httpReq(client *http.Client, name, url string, req interface{}) ([]byte, error) {
  321. s, err := json.Marshal(req)
  322. if err != nil {
  323. return nil, err
  324. }
  325. resp, err := client.Post(url+name, "text/json", bytes.NewBuffer(s))
  326. if err != nil {
  327. return nil, err
  328. }
  329. defer resp.Body.Close()
  330. body, err := ioutil.ReadAll(resp.Body)
  331. if err != nil {
  332. return nil, err
  333. }
  334. //log.Println(string(body))
  335. return body, nil
  336. }
  337. func streamReq(client *http.Client, name, url string, req interface{}) (io.ReadCloser, error) {
  338. s, err := json.Marshal(req)
  339. if err != nil {
  340. return nil, err
  341. }
  342. resp, err := client.Post(url+name, "text/json", bytes.NewBuffer(s))
  343. if err != nil {
  344. return nil, err
  345. }
  346. return resp.Body, err
  347. }
  348. func (gds *GeneralDS) getInsIdStr(intInsId int64) string {
  349. insIdStr, ok := gds.idMappingMap[intInsId]
  350. if ok {
  351. return insIdStr
  352. } else {
  353. return ""
  354. }
  355. }
  356. func (gds *GeneralDS) addIns(insId int64) *market.Instrument {
  357. insIdStr, ok := gds.idMappingMap[insId]
  358. if ok {
  359. ins, ok1 := gds.insMap[insIdStr]
  360. if !ok1 {
  361. //log.Println("addIns wrongggggggggg idmapping", gds.typ, insId, insIdStr)
  362. }
  363. return ins
  364. } else {
  365. tmpIns, err := gds.getIns(insId)
  366. if err != nil {
  367. //log.Println("addIns wrongggggggggg ins", gds.typ, insId, err)
  368. return nil
  369. }
  370. if tmpIns.Id != insId {
  371. //log.Println("addIns wrongggggggggg insId", gds.typ, insId, tmpIns.Id)
  372. return nil
  373. }
  374. ins := tkIns2mkIns(gds.typ, tmpIns)
  375. //if tk.Type == IntSina {
  376. //log.Println("sssss", insId, tmpIns, *ins)
  377. //}
  378. gds.insMap[ins.Id] = ins
  379. gds.idMappingMap[tmpIns.Id] = ins.Id
  380. gds.insPublisher.Publish(ins)
  381. return ins
  382. }
  383. return nil
  384. }
  385. func tMk2mMk(tick Market, ins *market.Instrument) market.Market {
  386. mk := market.Market{
  387. InsId: ins.Id,
  388. Timestamp: tick.Timestamp,
  389. Close: tick.Close,
  390. Open: tick.Open,
  391. High: tick.High,
  392. Low: tick.Low,
  393. AllVolume: tick.AllVolume,
  394. AllAmount: tick.AllAmount,
  395. LastPrice: tick.LastPrice,
  396. Volume: tick.LastVolume,
  397. }
  398. mk.Bids = make([]market.PP, len(tick.Bids))
  399. for i, v := range tick.Bids {
  400. mk.Bids[i][0] = v[0]
  401. mk.Bids[i][1] = v[1]
  402. }
  403. mk.Asks = make([]market.PP, len(tick.Asks))
  404. for i, v := range tick.Asks {
  405. mk.Asks[i][0] = v[0]
  406. mk.Asks[i][1] = v[1]
  407. }
  408. mk.SetIns(ins)
  409. return mk
  410. }
  411. const (
  412. jm = iota
  413. TC
  414. RM
  415. SR
  416. bu
  417. a
  418. ru
  419. i
  420. al
  421. y
  422. b
  423. zn
  424. fb
  425. j
  426. ag
  427. PM
  428. TA
  429. IF
  430. c
  431. pb
  432. l
  433. TF
  434. hc
  435. SF
  436. m
  437. fu
  438. wr
  439. CF
  440. RI
  441. v
  442. JR
  443. SM
  444. cu
  445. rb
  446. bb
  447. pp
  448. FG
  449. RS
  450. WH
  451. au
  452. jd
  453. p
  454. MA
  455. LR
  456. IH
  457. IC
  458. )
  459. var ctpTypMap map[string]int
  460. var ctpTyps = map[int]string{
  461. jm: "JM",
  462. TC: "TC",
  463. RM: "RM",
  464. SR: "SR",
  465. bu: "BU",
  466. a: "A",
  467. ru: "RU",
  468. i: "I",
  469. al: "AL",
  470. y: "Y",
  471. b: "B",
  472. zn: "ZN",
  473. fb: "FB",
  474. j: "J",
  475. ag: "AG",
  476. PM: "PM",
  477. TA: "TA",
  478. IF: "IF",
  479. c: "C",
  480. pb: "PB",
  481. l: "L",
  482. TF: "TF",
  483. hc: "HC",
  484. SF: "SF",
  485. m: "M",
  486. fu: "FU",
  487. wr: "WR",
  488. CF: "CF",
  489. RI: "RI",
  490. v: "V",
  491. JR: "JR",
  492. SM: "SM",
  493. cu: "CU",
  494. rb: "RB",
  495. bb: "BB",
  496. pp: "PP",
  497. FG: "FG",
  498. RS: "RS",
  499. WH: "WH",
  500. au: "AU",
  501. jd: "JD",
  502. p: "P",
  503. MA: "MA",
  504. LR: "LR",
  505. IH: "IH",
  506. IC: "IC",
  507. }
  508. type tPoint struct {
  509. hour, minute int
  510. }
  511. type tInterval struct {
  512. st, et tPoint
  513. }
  514. var shefTi = []tInterval{
  515. {tPoint{9, 0}, tPoint{10, 15}},
  516. {tPoint{10, 30}, tPoint{11, 30}},
  517. {tPoint{13, 30}, tPoint{15, 0}},
  518. {tPoint{21, 0}, tPoint{23, 59}},
  519. {tPoint{0, 0}, tPoint{2, 30}},
  520. }
  521. var decAndCzceTi = []tInterval{
  522. {tPoint{9, 0}, tPoint{10, 15}},
  523. {tPoint{10, 30}, tPoint{11, 30}},
  524. {tPoint{13, 30}, tPoint{15, 0}},
  525. }
  526. var cffexTi = []tInterval{
  527. {tPoint{9, 15}, tPoint{11, 30}},
  528. {tPoint{13, 0}, tPoint{15, 15}},
  529. }
  530. /*func (gds *GeneralDS) download() {
  531. var offset int
  532. num := 1000
  533. start := gds.st
  534. for num >= 1000 {
  535. req := &DownloadRequest{Type: gds.typ, Start: start, End: 0, Offset: offset, Count: 1000, OrderBy: "time asc"}
  536. log.Println("download", req)
  537. body, err := httpReq(gds.client, "download", gds.url, req)
  538. if err != nil {
  539. log.Println("httpReq", err)
  540. return
  541. }
  542. var ticks []*Market
  543. _, err = decodeResp(body, &ticks)
  544. if err != nil {
  545. log.Println("decodeResp", err)
  546. return
  547. }
  548. log.Println("download num:", len(ticks))
  549. for _, v := range ticks {
  550. if v.Type != int32(gds.typId) {
  551. log.Println("download wrongggggggggg typ", v.Type, gds.typId)
  552. continue
  553. }
  554. gds.st = v.Timestamp
  555. ins := gds.addIns(*v)
  556. if nil == ins {
  557. continue
  558. }
  559. mk := tMk2mMk(*v, ins)
  560. //ins.SetMk(&mk)
  561. gds.Save(&mk)
  562. //printDelay(DataTypeName(int(v.Type)), "eeeee"+fmt.Sprint(v.InsId), v.Timestamp)
  563. }
  564. //log.Println(ticks)
  565. num = len(ticks)
  566. offset += num
  567. }
  568. }*/
  569. /*func (gds *GeneralDS) download() {
  570. var offset int
  571. var lastSt, lastTime int64
  572. bSorted := true
  573. num := 1000
  574. for num >= 1000 {
  575. if bSorted {
  576. if gds.st == lastSt {
  577. offset += num
  578. } else {
  579. offset = 0
  580. }
  581. } else {
  582. gds.st = lastSt
  583. offset += num
  584. }
  585. req := &DownloadRequest{Type: gds.typ, Start: gds.st, End: 0, Offset: offset, Count: 1000, OrderBy: "time asc"}
  586. log.Println("download", req)
  587. lastSt = gds.st
  588. body, err := httpReq(gds.client, "download", gds.url, req)
  589. if err != nil {
  590. log.Println("httpReq", err)
  591. return
  592. }
  593. var ticks []*Market
  594. _, err = decodeResp(body, &ticks)
  595. if err != nil {
  596. log.Println("decodeResp", err)
  597. return
  598. }
  599. log.Println("download num:", len(ticks))
  600. for _, v := range ticks {
  601. gds.st = v.Timestamp
  602. if gds.st < lastTime {
  603. bSorted = false
  604. log.Println("not sorted")
  605. }
  606. lastTime = gds.st
  607. ins := gds.addIns(*v)
  608. mk := tMk2mMk(*v, ins)
  609. ins.SetMk(&mk)
  610. gds.Save(&mk)
  611. //printDelay(DataTypeName(int(v.Type)), "eeeee"+fmt.Sprint(v.InsId), v.Timestamp)
  612. }
  613. //log.Println(ticks)
  614. num = len(ticks)
  615. //offset += num
  616. }
  617. }*/