ds.go.1 11 KB


  1. // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved.
  2. package market
  3. // 本文件实现DataSource数据源的tick数据获取下载和保存
  4. import (
  5. "errors"
  6. "fmt"
  7. "log"
  8. "os"
  9. "path"
  10. "strings"
  11. "sync"
  12. "time"
  13. "dev.33.cn/framework/base"
  14. "dev.33.cn/framework/event"
  15. )
  16. var Debug bool = false // for debug
  17. type InsName interface {
  18. GetInsName(insId string) string
  19. }
  20. // 数据源接口
  21. // lmax, easyforex, oanda, ctp以及未来的macoin等均实现此接口
  22. type DataSource interface {
  23. SubIns() *event.Event
  24. Run()
  25. GetLastTicks(insId string, n int) ([]Tick, error)
  26. GetLastCandles(insId string, period, n int) ([]Candle, error)
  27. }
  28. type DSBase struct {
  29. db *MyDB
  30. dir string
  31. chM chan *Market
  32. chId chan string
  33. cmu sync.Mutex // 保护camp
  34. cmap map[int]map[string]*candleBuf // 缓存最新的Candle
  35. tmu sync.Mutex // 保护tmap
  36. tmap map[string]*tickBuf // 缓存最新的Tick
  37. }
  38. func makeCandleMap() map[int]map[string]*candleBuf {
  39. cmap := make(map[int]map[string]*candleBuf)
  40. for k, _ := range basePeriodSet {
  41. cmap[k] = make(map[string]*candleBuf)
  42. }
  43. return cmap
  44. }
  45. func NewDsBase(db *MyDB, dir string) *DSBase {
  46. dsb := &DSBase{
  47. db: db,
  48. dir: dir,
  49. chId: make(chan string, 1),
  50. chM: make(chan *Market, 10240),
  51. cmap: makeCandleMap(),
  52. tmap: make(map[string]*tickBuf),
  53. }
  54. return dsb
  55. }
  56. func (dsb *DSBase) GetLastCandles(insId string, period, n int) ([]Candle, error) {
  57. log.Println("@@@: DSBase.GetLastCandles:", insId, period, n)
  58. defer log.Println("###: DSBase.GetLastCandles:", insId, period, n)
  59. buf, ok := dsb.getBuf(insId, period)
  60. if !ok {
  61. msg := fmt.Sprintf("GetLastCandles error: %s insId is NOT in fzm exchange", insId)
  62. return nil, errors.New(msg)
  63. }
  64. buf.Lock()
  65. defer buf.Unlock()
  66. p := len(buf.buf) - n
  67. if p < 0 {
  68. p = 0
  69. }
  70. return buf.buf[p:], nil
  71. }
  72. func (dsb *DSBase) GetLastTicks(insId string, n int) ([]Tick, error) {
  73. dsb.tmu.Lock()
  74. defer dsb.tmu.Unlock()
  75. buf, ok := dsb.tmap[insId]
  76. if !ok {
  77. msg := fmt.Sprintf("GetLastTicks error: %s insId is NOT in fzm exchange", insId)
  78. return nil, errors.New(msg)
  79. }
  80. buf.Lock()
  81. defer buf.Unlock()
  82. p := len(buf.buf) - n
  83. if p < 0 {
  84. p = 0
  85. }
  86. return buf.buf[p:], nil
  87. }
  88. func (dsb *DSBase) getBuf(insId string, period int) (*candleBuf, bool) {
  89. dsb.cmu.Lock()
  90. defer dsb.cmu.Unlock()
  91. bufMap, ok := dsb.cmap[period]
  92. if !ok {
  93. return nil, false
  94. }
  95. buf, ok := bufMap[insId]
  96. if !ok {
  97. return nil, false
  98. }
  99. return buf, true
  100. }
  101. // 删除不需要的insId
  102. func (dsb *DSBase) Del(insId string) {
  103. dsb.chId <- insId
  104. }
  105. func (dsb *DSBase) Save(m *Market) {
  106. select {
  107. case dsb.chM <- m:
  108. default:
  109. log.Println("@@@:Save:", m.InsId, m.LastPrice)
  110. }
  111. }
  112. type candleInfo struct {
  113. c Candle
  114. insId string
  115. period int
  116. }
  117. type readerInfo struct {
  118. r *candleBuffer
  119. insId string
  120. period int
  121. prev *Candle
  122. }
  123. // 每小时检查一次, 删除超过n数据
  124. func (dsb *DSBase) saveCheck() {
  125. ticker := time.Tick(time.Hour)
  126. for _ = range ticker {
  127. dsb.cmu.Lock()
  128. for _, m := range dsb.cmap {
  129. for _, buf := range m {
  130. n := 1440 // M1一天的, 其他周期不论
  131. buf.Lock()
  132. l := len(buf.buf)
  133. if l > n {
  134. buf.buf = buf.buf[l-n:]
  135. }
  136. buf.Unlock()
  137. }
  138. }
  139. dsb.cmu.Unlock()
  140. dsb.tmu.Lock()
  141. for insId, buf := range dsb.tmap {
  142. dsName := InsIdPrefix(insId)
  143. n := 0
  144. switch dsName {
  145. case Lmax, Saxo:
  146. n = 3600 * 10 * 6 // 大约缓存6小时
  147. case Ctp, Dzh:
  148. n = 3600 * 2 * 3 // 大约缓存3小时
  149. default:
  150. n = 8640 // easyforex等 大约1天
  151. }
  152. buf.Lock()
  153. l := len(buf.buf)
  154. if l > n {
  155. buf.buf = buf.buf[l-n:]
  156. }
  157. buf.Unlock()
  158. }
  159. dsb.tmu.Unlock()
  160. }
  161. }
  162. // 保存到内存并保存为文件
  163. func (dsb *DSBase) RunSave(n int) {
  164. log.Println("@@@:RunSave:", n)
  165. tcCh := make(chan *tconver, 40960) // 足够大,包括所有的品种
  166. go dsb.doRead(tcCh)
  167. for i := 0; i < n; i++ {
  168. go dsb.doConv(tcCh)
  169. }
  170. // 保存数据到文件
  171. if dsb.db != nil {
  172. if Debug { // 测试时每小时保存一次
  173. ticker := time.Tick(time.Hour)
  174. for _ = range ticker {
  175. dsb.doSave()
  176. }
  177. } else { // 每天定时保存
  178. ticker := time.Tick(time.Second * 30)
  179. for t := range ticker {
  180. if t.Hour() == 5 && t.Minute() == 0 { // 5:00点时保存
  181. dsb.doSave()
  182. dsb.saveCheck()
  183. }
  184. }
  185. }
  186. } else {
  187. dsb.saveCheck()
  188. }
  189. }
  190. type tbuf struct {
  191. tb *tickBuf
  192. insId string
  193. }
  194. type cbuf struct {
  195. cb *candleBuf
  196. insId string
  197. period int
  198. }
  199. // 把缓存数据写入文件, 所有K线周期数据都用缓存
  200. func (dsb *DSBase) doSave0() {
  201. tCh := make(chan tbuf, 8192)
  202. cCh := make(chan cbuf, 8192*4)
  203. for i := 0; i < 4; i++ {
  204. go func() {
  205. for {
  206. select {
  207. case t := <-tCh:
  208. tb := t.tb
  209. tb.Lock()
  210. dsb.saveTick(tb.buf, t.insId, true)
  211. tb.Unlock()
  212. case c := <-cCh:
  213. cb := c.cb
  214. cb.Lock()
  215. dsb.saveCandle(c.insId, c.period, cb.buf)
  216. cb.Unlock()
  217. }
  218. }
  219. }()
  220. }
  221. dsb.tmu.Lock()
  222. for insId, tb := range dsb.tmap {
  223. tCh <- tbuf{tb, insId}
  224. }
  225. dsb.tmu.Unlock()
  226. close(tCh)
  227. dsb.cmu.Lock()
  228. for period, m := range dsb.cmap {
  229. for insId, cb := range m {
  230. cCh <- cbuf{cb, insId, period}
  231. }
  232. }
  233. dsb.cmu.Unlock()
  234. close(cCh)
  235. }
  236. // 把缓存数据写入文件, K线数据使用tick转换
  237. func (dsb *DSBase) doSave() {
  238. ch := make(chan tbuf, 8192)
  239. for i := 0; i < 4; i++ {
  240. go func() {
  241. for t := range ch {
  242. tb := t.tb
  243. insId := t.insId
  244. fname, err := dsb.saveTick(tb.buf, insId, true)
  245. if err == nil {
  246. convAndSaveCandles(dsb.db, insId, fname, tb.buf)
  247. }
  248. }
  249. }()
  250. }
  251. dsb.tmu.Lock()
  252. for insId, tb := range dsb.tmap {
  253. ch <- tbuf{tb, insId}
  254. }
  255. dsb.tmu.Unlock()
  256. close(ch)
  257. }
  258. // 从chR中读出Candle, 周期转换, 存储到chC中
  259. func (dsb *DSBase) doConv0(chR chan *readerInfo) {
  260. for {
  261. ri := <-chR
  262. select {
  263. case ce := <-ri.r.ch:
  264. c, err := ce.candle, ce.err
  265. if err != nil {
  266. log.Println(err)
  267. break // break select
  268. }
  269. dsb.cmu.Lock()
  270. cbuf, ok := dsb.cmap[ri.period][ri.insId]
  271. dsb.cmu.Unlock()
  272. if !ok {
  273. break // break select
  274. }
  275. if cbuf.leng() == 0 {
  276. cbuf.add(c)
  277. }
  278. last := cbuf.last()
  279. if last.Timestamp != c.Timestamp {
  280. cbuf.add(c)
  281. } else {
  282. *last = *c
  283. }
  284. default:
  285. time.Sleep(time.Millisecond * 1)
  286. }
  287. chR <- ri
  288. }
  289. }
  290. // 从chR中读出Candle, 周期转换, 存储到chC中
  291. // tc.ch 保证了tick的顺序, 保证每个tconver(tc)的ch 同时只有一个goroutine在操作
  292. // 这样doConv就可以并发执行
  293. func (dsb *DSBase) doConv(tcCh chan *tconver) {
  294. for {
  295. tc := <-tcCh
  296. dsb.cmu.Lock()
  297. cbuf, ok := dsb.cmap[tc.period][tc.insId]
  298. dsb.cmu.Unlock()
  299. if ok {
  300. select {
  301. case t := <-tc.ch:
  302. if t == nil {
  303. break // break select
  304. }
  305. c := tc.convEx(t)
  306. if tc.period == M1 {
  307. //testCandle(0, nil, t, c, true)
  308. }
  309. for _, v := range c {
  310. if v == nil {
  311. break
  312. }
  313. if cbuf.leng() == 0 {
  314. cbuf.add(v)
  315. }
  316. last := cbuf.last()
  317. if last.Timestamp != v.Timestamp {
  318. cbuf.add(v)
  319. } else {
  320. *last = *v
  321. }
  322. }
  323. default:
  324. time.Sleep(time.Microsecond * 1)
  325. }
  326. }
  327. tcCh <- tc // 完成后send back, 其他goroutine recv
  328. }
  329. }
  330. var periodSet = []int{M1, M5, H1, D1}
  331. // 从dsb.chM读出实时行情数据, 分别缓存Tick和做Candle周期转换
  332. func (dsb *DSBase) doRead(tcCh chan *tconver) {
  333. mapTB := make(map[string]map[int]*tconver)
  334. for {
  335. m := <-dsb.chM
  336. var t *Tick
  337. if InsIdPrefix(m.InsId) == Lmax {
  338. t = Market2TickByBid(m)
  339. } else {
  340. t = Market2Tick(m)
  341. }
  342. // tick 缓存
  343. dsb.tmu.Lock()
  344. if _, ok := dsb.tmap[m.InsId]; !ok {
  345. dsb.tmap[m.InsId] = &tickBuf{}
  346. log.Println("@@@: dsb.tmap", m.InsId)
  347. }
  348. dsb.tmap[m.InsId].add(t)
  349. dsb.tmu.Unlock()
  350. // candle 不同周期缓存
  351. dsb.cmu.Lock()
  352. for _, period := range periodSet {
  353. if _, ok := dsb.cmap[period]; !ok {
  354. log.Fatal("_, ok := dsb.cmap[period] error")
  355. }
  356. if _, ok := dsb.cmap[period][m.InsId]; !ok {
  357. dsb.cmap[period][m.InsId] = &candleBuf{}
  358. log.Println("@@@: dsb.cmap", m.InsId, period)
  359. }
  360. }
  361. dsb.cmu.Unlock()
  362. //把tick数据转到不同周期的TickBuffer做周期转换
  363. if _, ok := mapTB[m.InsId]; !ok {
  364. mapTB[m.InsId] = make(map[int]*tconver)
  365. for _, period := range periodSet {
  366. cg, _ := base.NewCandle(period, 2, nil, 0)
  367. tc := &tconver{ch: make(chan *Tick, 1024), cg: cg, period: period, insId: m.InsId}
  368. mapTB[m.InsId][period] = tc
  369. tcCh <- tc
  370. }
  371. }
  372. // 把tick数据保存到不同周期转换chan中
  373. for _, period := range periodSet {
  374. mapTB[m.InsId][period].ch <- t
  375. }
  376. }
  377. }
  378. // saveTick 把一个tick数据写入本地文件, 并把文件信息记录数据库
  379. func (dsb *DSBase) saveTick(ts []Tick, insId string, zip bool) (string, error) {
  380. if len(ts) == 0 {
  381. return "", errors.New("len(ts) == 0")
  382. }
  383. t0 := ts[0]
  384. t := time.Unix(t0.Timestamp/1000, 0)
  385. dir := path.Join(dsb.dir, insId, fmt.Sprint(t.Year()), fmt.Sprint(t.Month()), fmt.Sprint(t.Day()))
  386. os.MkdirAll(dir, 0777)
  387. fname := path.Join(dir, fmt.Sprintf("%d-tick.TK", time.Now().UnixNano()))
  388. var err error
  389. if zip {
  390. err = SaveTicks(dsb.db, "", fname, ts, insId)
  391. } else {
  392. err = SaveTicksNoZip(dsb.db, "", fname, ts, insId)
  393. }
  394. if err != nil {
  395. return "", err
  396. }
  397. return fname, nil
  398. }
  399. // saveCandle 把一个candle数据写入本地文件, 并把文件信息记录数据库
  400. func (dsb *DSBase) saveCandle(insId string, period int, candles []Candle) error {
  401. if len(candles) == 0 {
  402. return nil
  403. }
  404. t := time.Unix(candles[0].Timestamp/1000, 0)
  405. dir := path.Join(dsb.dir, insId, fmt.Sprint(t.Year()))
  406. if period == H1 {
  407. dir = path.Join(dir, fmt.Sprint(t.Month()))
  408. } else if period < H1 {
  409. dir = path.Join(dir, fmt.Sprint(t.Day()))
  410. }
  411. os.MkdirAll(dir, 0777)
  412. bname := fmt.Sprintf("%d-candle.%s", time.Now().UnixNano(), PeriodNameMap[period])
  413. fname := path.Join(dir, bname)
  414. if period < H1 {
  415. return SaveCandles(dsb.db, "", fname, candles, insId, period)
  416. }
  417. return SaveH1OrD1(dsb.db, "", fname, candles, insId, period)
  418. }
  419. func ConvAndSaveCandles(db *MyDB, insId, path string, ticks []Tick) error {
  420. refer := path
  421. var candles []Candle
  422. pa := []int{M1, M5, H1, D1}
  423. for _, period := range pa {
  424. newpath := strings.Replace(path, "tick.TK", "candle."+PeriodNameMap[period], 1)
  425. fi, err := os.Stat(newpath)
  426. if err == nil && fi.Size() > 0 {
  427. return nil
  428. }
  429. //var err error
  430. if period == M1 {
  431. candles, err = convCandles0(ticks, M1)
  432. } else {
  433. candles, err = convCandles1(candles, period)
  434. }
  435. if err != nil {
  436. log.Println("convAndSaveCandles error:", err)
  437. if len(candles) == 0 {
  438. return err
  439. }
  440. }
  441. err = SaveCandles(db, refer, newpath, candles, insId, period)
  442. if err != nil {
  443. return err
  444. }
  445. }
  446. return nil
  447. }
  448. func convAndSaveCandles(db *MyDB, insId, path string, ticks []Tick) error {
  449. refer := path
  450. var candles []Candle
  451. pa := []int{M1, M5, H1, D1}
  452. for _, period := range pa {
  453. var err error
  454. if period == M1 {
  455. candles, err = convCandles0(ticks, M1)
  456. } else {
  457. candles, err = convCandles1(candles, period)
  458. }
  459. if err != nil {
  460. log.Println("convAndSaveCandles error:", err)
  461. if len(candles) == 0 {
  462. return err
  463. }
  464. }
  465. newpath := strings.Replace(path, "tick.TK", "candle."+PeriodNameMap[period], 1)
  466. err = SaveCandles(db, refer, newpath, candles, insId, period)
  467. if err != nil {
  468. return err
  469. }
  470. }
  471. return nil
  472. }
  473. func convCandles0(ticks []Tick, period int) ([]Candle, error) {
  474. r := NewTickBuf(ticks)
  475. return TickConvCandle(r, period)
  476. }
  477. func convCandles1(candles []Candle, period int) ([]Candle, error) {
  478. r := NewCandleBuf(candles)
  479. return ConvPeriod(r, period)
  480. }