ds.go 11 KB


  1. // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved.
  2. package market
  3. // 本文件实现DataSource数据源的tick数据获取下载和保存
  4. import (
  5. "errors"
  6. "fmt"
  7. "io/ioutil"
  8. "log"
  9. "os"
  10. "path"
  11. "strings"
  12. "sync"
  13. "time"
  14. "unsafe"
  15. "tickserver/framework/base"
  16. "tickserver/framework/event"
  17. )
  18. var Debug bool = false // for debug
  19. type InsName interface {
  20. GetInsName(insId string) string
  21. }
  22. type BufCandleMaker struct {
  23. candleGenerators []*base.Candle
  24. ohlcs []base.Ohlc
  25. tbuf *tickBuf
  26. cbufs []*CandleBuf
  27. }
  28. // 数据源接口
  29. // lmax, easyforex, oanda, ctp以及未来的macoin等均实现此接口
  30. type DataSource interface {
  31. SubIns() *event.Event
  32. Run()
  33. GetCacheTicks(insId string) ([]Tick, error)
  34. GetCacheCandles(insId string, period int) ([]Candle, error)
  35. GetTimeList(insId, period, beginStr string) ([]string, error)
  36. SaveAllTicks()
  37. }
  38. type DSBase struct {
  39. db *MyDB
  40. dir string
  41. chMs chan *Market
  42. chMl chan *Market
  43. chId chan string
  44. cmu sync.Mutex // 保护camp
  45. cmap map[int]map[string]*CandleBuf // 缓存最新的Candle
  46. tmu sync.Mutex // 保护tmap
  47. tmap map[string]*tickBuf // 缓存最新的Tick
  48. cmmu sync.Mutex
  49. bufCandleMakersMap map[string]*BufCandleMaker
  50. saveCh chan string
  51. }
  52. func makeCandleMap() map[int]map[string]*CandleBuf {
  53. cmap := make(map[int]map[string]*CandleBuf)
  54. for k, _ := range basePeriodSet {
  55. cmap[k] = make(map[string]*CandleBuf)
  56. }
  57. return cmap
  58. }
  59. func NewDsBase(db *MyDB, dir string) *DSBase {
  60. dsb := &DSBase{
  61. db: db,
  62. dir: dir,
  63. chId: make(chan string, 1),
  64. chMs: make(chan *Market, 20480),
  65. chMl: make(chan *Market, 20480),
  66. cmap: makeCandleMap(),
  67. tmap: make(map[string]*tickBuf),
  68. bufCandleMakersMap: make(map[string]*BufCandleMaker),
  69. saveCh: make(chan string, 20480),
  70. }
  71. return dsb
  72. }
  73. func (dsb *DSBase) GetTimeList(insId, period, beginStr string) ([]string, error) {
  74. // log.Println("@@@: DSBase.GetLastCandles:", insId, period, n)
  75. // defer log.Println("###: DSBase.GetLastCandles:", insId, period, n)
  76. var year, month, day int
  77. fmt.Sscanf(beginStr, "%04d%02d%02d", &year, &month, &day)
  78. var timelist []string
  79. dir := path.Join(dsb.dir, insId, fmt.Sprint(year))
  80. for {
  81. if _, err := os.Stat(dir); os.IsNotExist(err) {
  82. year++
  83. if year > time.Now().Year() {
  84. break
  85. }
  86. dir = path.Join(dsb.dir, insId, fmt.Sprint(year))
  87. continue
  88. }
  89. tl := getTimeList(dir, period, beginStr)
  90. timelist = append(timelist, tl...)
  91. year++
  92. if year > time.Now().Year() {
  93. break
  94. }
  95. dir = path.Join(dsb.dir, insId, fmt.Sprint(year))
  96. }
  97. return timelist, nil
  98. }
  99. func getTimeList(dir, period, beginStr string) []string {
  100. var timelist []string
  101. suffix := fmt.Sprintf(".%s.gz", period)
  102. infos, _ := ioutil.ReadDir(dir)
  103. for i := 0; i < len(infos); i++ {
  104. name := infos[i].Name()
  105. if strings.HasSuffix(name, suffix) {
  106. time := strings.Split(name, ".")[0]
  107. if time >= beginStr {
  108. timelist = append(timelist, time)
  109. }
  110. }
  111. }
  112. return timelist
  113. }
  114. func (dsb *DSBase) GetCacheCandles(insId string, period int) ([]Candle, error) {
  115. // log.Println("@@@: DSBase.GetLastCandles:", insId, period, n)
  116. // defer log.Println("###: DSBase.GetLastCandles:", insId, period, n)
  117. buf, ok := dsb.getBuf(insId, period)
  118. if !ok {
  119. msg := fmt.Sprintf("GetLastCandles error: %s insId is NOT in fzm exchange", insId)
  120. return nil, errors.New(msg)
  121. }
  122. buf.Lock()
  123. defer buf.Unlock()
  124. return buf.Buf[:], nil
  125. }
  126. func (dsb *DSBase) GetCacheTicks(insId string) ([]Tick, error) {
  127. dsb.tmu.Lock()
  128. defer dsb.tmu.Unlock()
  129. buf, ok := dsb.tmap[insId]
  130. if !ok {
  131. msg := fmt.Sprintf("GetLastTicks error: %s insId is NOT in fzm exchange", insId)
  132. return nil, errors.New(msg)
  133. }
  134. buf.Lock()
  135. defer buf.Unlock()
  136. return buf.buf[:], nil
  137. }
  138. func (dsb *DSBase) getBuf(insId string, period int) (*CandleBuf, bool) {
  139. dsb.cmu.Lock()
  140. defer dsb.cmu.Unlock()
  141. bufMap, ok := dsb.cmap[period]
  142. if !ok {
  143. return nil, false
  144. }
  145. buf, ok := bufMap[insId]
  146. if !ok {
  147. return nil, false
  148. }
  149. return buf, true
  150. }
  151. // 删除不需要的insId
  152. func (dsb *DSBase) Del(insId string) {
  153. dsb.chId <- insId
  154. }
  155. func (dsb *DSBase) Save(m *Market) {
  156. select {
  157. case dsb.chMs <- m:
  158. default:
  159. //log.Println("@@@:Save:", m.InsId, m.LastPrice)
  160. }
  161. }
  162. func (dsb *DSBase) SaveL(m *Market) {
  163. dsb.chMl <- m
  164. //select {
  165. //case dsb.chMl <- m:
  166. //default:
  167. //log.Println("@@@:Save:", m.InsId, m.LastPrice)
  168. //}
  169. }
  170. type candleInfo struct {
  171. c Candle
  172. insId string
  173. period int
  174. }
  175. type readerInfo struct {
  176. r *candleBuffer
  177. insId string
  178. period int
  179. prev *Candle
  180. }
  181. func (dsb *DSBase) NewCandleMaker(insId string) *BufCandleMaker {
  182. // tick 缓存
  183. var tbuf *tickBuf
  184. var ok bool
  185. dsb.tmu.Lock()
  186. if tbuf, ok = dsb.tmap[insId]; !ok {
  187. tbuf = &tickBuf{}
  188. dsb.tmap[insId] = tbuf
  189. // log.Println("@@@: dsb.tmap", m.InsId)
  190. }
  191. dsb.tmu.Unlock()
  192. // candle 不同周期缓存
  193. var cbufs []*CandleBuf
  194. dsb.cmu.Lock()
  195. for _, period := range periodSet {
  196. if _, ok := dsb.cmap[period]; !ok {
  197. log.Fatal("_, ok := dsb.cmap[period] error")
  198. }
  199. var cbuf *CandleBuf
  200. if cbuf, ok = dsb.cmap[period][insId]; !ok {
  201. cbuf = &CandleBuf{}
  202. dsb.cmap[period][insId] = cbuf
  203. // log.Println("@@@: dsb.cmap", m.InsId, period)
  204. }
  205. cbufs = append(cbufs, cbuf)
  206. }
  207. dsb.cmu.Unlock()
  208. candleGenerators := make([]*base.Candle, len(periodSet))
  209. ohlcs := make([]base.Ohlc, len(periodSet))
  210. for i, period := range periodSet {
  211. candleGenerators[i], _ = base.NewCandle(period, 2, nil, 0)
  212. if strings.HasPrefix(insId, Ctp) {
  213. candleGenerators[i].Set(base.CANDLE_AUTOCOMPLETE_MAX, 1)
  214. }
  215. ohlcs[i] = base.Ohlc{}
  216. }
  217. return &BufCandleMaker{
  218. candleGenerators: candleGenerators[:],
  219. ohlcs: ohlcs[:],
  220. tbuf: tbuf,
  221. cbufs: cbufs,
  222. }
  223. }
  224. func (dsb *DSBase) SaveAllTicks() {
  225. dsb.tmu.Lock()
  226. defer dsb.tmu.Unlock()
  227. for i, v := range dsb.tmap {
  228. var saveTicks []Tick
  229. v.Lock()
  230. saveTicks = v.buf[:]
  231. v.buf = v.buf[0:0]
  232. v.Unlock()
  233. //log.Println("SaveAllTicks saving", i)
  234. SaveTickEx(dsb.dir, saveTicks, i, false)
  235. //if err != nil {
  236. //log.Println(fname, err)
  237. //}
  238. }
  239. }
  240. func (dsb *DSBase) SaveTicks() {
  241. for {
  242. insId := <-dsb.saveCh
  243. dsb.tmu.Lock()
  244. tbuf, ok := dsb.tmap[insId]
  245. dsb.tmu.Unlock()
  246. if ok {
  247. var saveTicks []Tick
  248. tbuf.Lock()
  249. if len(tbuf.buf) >= 2000 {
  250. saveTicks = tbuf.buf[:1000]
  251. tbuf.buf = tbuf.buf[1000:]
  252. }
  253. tbuf.Unlock()
  254. SaveTickEx(dsb.dir, saveTicks, insId, false)
  255. //if err != nil {
  256. //log.Println(fname, err)
  257. //}
  258. }
  259. }
  260. }
  261. func (dsb *DSBase) DoReadEx() error {
  262. go dsb.SaveTicks()
  263. for {
  264. var bLoaded bool
  265. var m *Market
  266. select {
  267. case m = <-dsb.chMs:
  268. bLoaded = false
  269. case m = <-dsb.chMl:
  270. bLoaded = true
  271. }
  272. var t *Tick
  273. if InsIdPrefix(m.InsId) == Lmax && !bLoaded {
  274. t = Market2TickByBid(m)
  275. } else {
  276. t = Market2Tick(m)
  277. }
  278. bcm, ok := dsb.bufCandleMakersMap[m.InsId]
  279. if !ok {
  280. bcm = dsb.NewCandleMaker(m.InsId)
  281. dsb.bufCandleMakersMap[m.InsId] = bcm
  282. }
  283. if !bLoaded {
  284. bSave := bcm.tbuf.add(t)
  285. if bSave {
  286. dsb.saveCh <- m.InsId
  287. }
  288. }
  289. for i, candleGenerator := range bcm.candleGenerators {
  290. tg := Tk2Tg(*t)
  291. num := candleGenerator.UpdateTick((*base.Tick)(unsafe.Pointer(&tg)))
  292. var candles []Candle
  293. if num == 0 {
  294. candleGenerator.Next(&bcm.ohlcs[i])
  295. ohlcGo := bcm.ohlcs[i].ToGOStruct()
  296. candles = append(candles, OhlcGo2Candle(ohlcGo))
  297. } else if num > 0 {
  298. for j := 0; j < num; j++ {
  299. candleGenerator.Next(&bcm.ohlcs[i])
  300. ohlcGo := bcm.ohlcs[i].ToGOStruct()
  301. candles = append(candles, OhlcGo2Candle(ohlcGo))
  302. }
  303. } else {
  304. //log.Println("tick error.")
  305. }
  306. for _, v := range candles {
  307. last := bcm.cbufs[i].Last()
  308. if last != nil && last.Timestamp == v.Timestamp {
  309. *last = v
  310. } else {
  311. bcm.cbufs[i].add(&v, periodSet[i])
  312. }
  313. }
  314. }
  315. }
  316. return nil
  317. }
  318. type tbuf struct {
  319. tb *tickBuf
  320. insId string
  321. }
  322. type cbuf struct {
  323. cb *CandleBuf
  324. insId string
  325. period int
  326. }
  327. var periodSet = []int{M1, M5, H1, D1}
  328. func SaveCandlesEx(dataDir, insId string, candles []Candle, period int, bTruncate bool) (string, error) {
  329. if len(candles) == 0 {
  330. return "", nil
  331. }
  332. t := time.Unix(candles[0].Timestamp/1000, 0)
  333. dir := path.Join(dataDir, insId)
  334. if period < D1 {
  335. dir = path.Join(dir, fmt.Sprint(t.Year()))
  336. }
  337. os.MkdirAll(dir, 0777)
  338. var bname string
  339. if period < D1 {
  340. bname = fmt.Sprintf("%04d%02d%02d.%s.gz", t.Year(), t.Month(), t.Day(), PeriodNameMap[period])
  341. } else {
  342. bname = fmt.Sprintf("%s.gz", PeriodNameMap[period])
  343. }
  344. fname := path.Join(dir, bname)
  345. if !bTruncate {
  346. candles, _ = combinEx(fname, candles)
  347. }
  348. // 新建并写入文件
  349. w, err := os.Create(fname)
  350. if err != nil {
  351. return "", errors.New("SaveCandles os.Create error:" + err.Error())
  352. }
  353. defer w.Close()
  354. err = ZipCBuf(w, candles)
  355. if err != nil {
  356. return "", errors.New("SaveCandles ZipCBuf error:" + err.Error())
  357. }
  358. return fname, nil
  359. }
  360. func SaveCandlesTmp(dataDir, insId string, candles []Candle, period int, bTruncate bool) (string, error) {
  361. if period == D1 {
  362. return saveCandlesTmp(dataDir, insId, candles, period, bTruncate)
  363. }
  364. if len(candles) == 0 {
  365. return "", nil
  366. }
  367. oneDay := int64(1000 * 3600 * 24)
  368. baseT := candles[0].Timestamp / oneDay
  369. begin := 0
  370. for k, v := range candles {
  371. tmpBaseT := v.Timestamp / oneDay
  372. if tmpBaseT != baseT {
  373. saveCandlesTmp(dataDir, insId, candles[begin:k], period, bTruncate)
  374. //log.Println(fname, err)
  375. begin = k
  376. baseT = tmpBaseT
  377. }
  378. }
  379. return saveCandlesTmp(dataDir, insId, candles[begin:], period, bTruncate)
  380. }
  381. func saveCandlesTmp(dataDir, insId string, candles []Candle, period int, bTruncate bool) (string, error) {
  382. if len(candles) == 0 {
  383. return "", nil
  384. }
  385. t := time.Unix(candles[0].Timestamp/1000, 0).UTC()
  386. dir := path.Join(dataDir, insId)
  387. if period < D1 {
  388. dir = path.Join(dir, fmt.Sprint(t.Year()))
  389. }
  390. os.MkdirAll(dir, 0777)
  391. var bname string
  392. if period < D1 {
  393. bname = fmt.Sprintf("%04d%02d%02d.%s.gz", t.Year(), t.Month(), t.Day(), PeriodNameMap[period])
  394. } else {
  395. bname = fmt.Sprintf("%s.gz", PeriodNameMap[period])
  396. }
  397. fname := path.Join(dir, bname)
  398. if period != D1 {
  399. tmpfname := fname
  400. fname = tmpfname + ".tmp"
  401. if _, err := os.Stat(fname); os.IsNotExist(err) {
  402. if _, err := os.Stat(tmpfname); err == nil {
  403. for err = os.Rename(tmpfname, fname); err != nil; err = os.Rename(tmpfname, fname) {
  404. time.Sleep(time.Second)
  405. }
  406. }
  407. }
  408. }
  409. if !bTruncate {
  410. candles, _ = combinEx(fname, candles)
  411. }
  412. // 新建并写入文件
  413. w, err := os.Create(fname)
  414. if err != nil {
  415. return "", errors.New("SaveCandles os.Create error:" + err.Error())
  416. }
  417. defer w.Close()
  418. err = ZipCBuf(w, candles)
  419. if err != nil {
  420. return "", errors.New("SaveCandles ZipCBuf error:" + err.Error())
  421. }
  422. //if period != D1 {
  423. //if _, err := os.Stat(tmpfname); os.IsNotExist(err) {
  424. //if _, err := os.Stat(fname); err == nil {
  425. //for err = os.Rename(fname, tmpfname); err != nil; err = os.Rename(fname, tmpfname) {
  426. //time.Sleep(time.Second)
  427. //}
  428. //}
  429. //}
  430. //}
  431. return fname, nil
  432. }
  433. func convCandles0(ticks []Tick, insId string, period int) ([]Candle, error) {
  434. r := NewTickBuf(ticks)
  435. return TickConvCandle(r, insId, period)
  436. }
  437. func convCandles1(candles []Candle, insId string, period int) ([]Candle, error) {
  438. r := NewCandleBuf(candles)
  439. return ConvPeriod(r, insId, period)
  440. }