// Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved. package market // 本文件实现DataSource数据源的tick数据获取下载和保存 import ( "errors" "fmt" "log" "os" "path" "strings" "sync" "time" "dev.33.cn/framework/base" "dev.33.cn/framework/event" ) var Debug bool = false // for debug type InsName interface { GetInsName(insId string) string } // 数据源接口 // lmax, easyforex, oanda, ctp以及未来的macoin等均实现此接口 type DataSource interface { SubIns() *event.Event Run() GetLastTicks(insId string, n int) ([]Tick, error) GetLastCandles(insId string, period, n int) ([]Candle, error) } type DSBase struct { db *MyDB dir string chM chan *Market chId chan string cmu sync.Mutex // 保护camp cmap map[int]map[string]*candleBuf // 缓存最新的Candle tmu sync.Mutex // 保护tmap tmap map[string]*tickBuf // 缓存最新的Tick } func makeCandleMap() map[int]map[string]*candleBuf { cmap := make(map[int]map[string]*candleBuf) for k, _ := range basePeriodSet { cmap[k] = make(map[string]*candleBuf) } return cmap } func NewDsBase(db *MyDB, dir string) *DSBase { dsb := &DSBase{ db: db, dir: dir, chId: make(chan string, 1), chM: make(chan *Market, 10240), cmap: makeCandleMap(), tmap: make(map[string]*tickBuf), } return dsb } func (dsb *DSBase) GetLastCandles(insId string, period, n int) ([]Candle, error) { log.Println("@@@: DSBase.GetLastCandles:", insId, period, n) defer log.Println("###: DSBase.GetLastCandles:", insId, period, n) buf, ok := dsb.getBuf(insId, period) if !ok { msg := fmt.Sprintf("GetLastCandles error: %s insId is NOT in fzm exchange", insId) return nil, errors.New(msg) } buf.Lock() defer buf.Unlock() p := len(buf.buf) - n if p < 0 { p = 0 } return buf.buf[p:], nil } func (dsb *DSBase) GetLastTicks(insId string, n int) ([]Tick, error) { dsb.tmu.Lock() defer dsb.tmu.Unlock() buf, ok := dsb.tmap[insId] if !ok { msg := fmt.Sprintf("GetLastTicks error: %s insId is NOT in fzm exchange", insId) return nil, errors.New(msg) } buf.Lock() defer buf.Unlock() p := len(buf.buf) - n if p < 0 { p = 0 } return buf.buf[p:], nil } func (dsb *DSBase) getBuf(insId string, period int) (*candleBuf, bool) { dsb.cmu.Lock() defer dsb.cmu.Unlock() bufMap, ok := dsb.cmap[period] if !ok { return nil, false } buf, ok := bufMap[insId] if !ok { return nil, false } return buf, true } // 删除不需要的insId func (dsb *DSBase) Del(insId string) { dsb.chId <- insId } func (dsb *DSBase) Save(m *Market) { select { case dsb.chM <- m: default: log.Println("@@@:Save:", m.InsId, m.LastPrice) } } type candleInfo struct { c Candle insId string period int } type readerInfo struct { r *candleBuffer insId string period int prev *Candle } // 每小时检查一次, 删除超过n数据 func (dsb *DSBase) saveCheck() { ticker := time.Tick(time.Hour) for _ = range ticker { dsb.cmu.Lock() for _, m := range dsb.cmap { for _, buf := range m { n := 1440 // M1一天的, 其他周期不论 buf.Lock() l := len(buf.buf) if l > n { buf.buf = buf.buf[l-n:] } buf.Unlock() } } dsb.cmu.Unlock() dsb.tmu.Lock() for insId, buf := range dsb.tmap { dsName := InsIdPrefix(insId) n := 0 switch dsName { case Lmax, Saxo: n = 3600 * 10 * 6 // 大约缓存6小时 case Ctp, Dzh: n = 3600 * 2 * 3 // 大约缓存3小时 default: n = 8640 // easyforex等 大约1天 } buf.Lock() l := len(buf.buf) if l > n { buf.buf = buf.buf[l-n:] } buf.Unlock() } dsb.tmu.Unlock() } } // 保存到内存并保存为文件 func (dsb *DSBase) RunSave(n int) { log.Println("@@@:RunSave:", n) tcCh := make(chan *tconver, 40960) // 足够大,包括所有的品种 go dsb.doRead(tcCh) for i := 0; i < n; i++ { go dsb.doConv(tcCh) } // 保存数据到文件 if dsb.db != nil { if Debug { // 测试时每小时保存一次 ticker := time.Tick(time.Hour) for _ = range ticker { dsb.doSave() } } else { // 每天定时保存 ticker := time.Tick(time.Second * 30) for t := range ticker { if t.Hour() == 5 && t.Minute() == 0 { // 5:00点时保存 dsb.doSave() dsb.saveCheck() } } } } else { dsb.saveCheck() } } type tbuf struct { tb *tickBuf insId string } type cbuf struct { cb *candleBuf insId string period int } // 把缓存数据写入文件, 所有K线周期数据都用缓存 func (dsb *DSBase) doSave0() { tCh := make(chan tbuf, 8192) cCh := make(chan cbuf, 8192*4) for i := 0; i < 4; i++ { go func() { for { select { case t := <-tCh: tb := t.tb tb.Lock() dsb.saveTick(tb.buf, t.insId, true) tb.Unlock() case c := <-cCh: cb := c.cb cb.Lock() dsb.saveCandle(c.insId, c.period, cb.buf) cb.Unlock() } } }() } dsb.tmu.Lock() for insId, tb := range dsb.tmap { tCh <- tbuf{tb, insId} } dsb.tmu.Unlock() close(tCh) dsb.cmu.Lock() for period, m := range dsb.cmap { for insId, cb := range m { cCh <- cbuf{cb, insId, period} } } dsb.cmu.Unlock() close(cCh) } // 把缓存数据写入文件, K线数据使用tick转换 func (dsb *DSBase) doSave() { ch := make(chan tbuf, 8192) for i := 0; i < 4; i++ { go func() { for t := range ch { tb := t.tb insId := t.insId fname, err := dsb.saveTick(tb.buf, insId, true) if err == nil { convAndSaveCandles(dsb.db, insId, fname, tb.buf) } } }() } dsb.tmu.Lock() for insId, tb := range dsb.tmap { ch <- tbuf{tb, insId} } dsb.tmu.Unlock() close(ch) } // 从chR中读出Candle, 周期转换, 存储到chC中 func (dsb *DSBase) doConv0(chR chan *readerInfo) { for { ri := <-chR select { case ce := <-ri.r.ch: c, err := ce.candle, ce.err if err != nil { log.Println(err) break // break select } dsb.cmu.Lock() cbuf, ok := dsb.cmap[ri.period][ri.insId] dsb.cmu.Unlock() if !ok { break // break select } if cbuf.leng() == 0 { cbuf.add(c) } last := cbuf.last() if last.Timestamp != c.Timestamp { cbuf.add(c) } else { *last = *c } default: time.Sleep(time.Millisecond * 1) } chR <- ri } } // 从chR中读出Candle, 周期转换, 存储到chC中 // tc.ch 保证了tick的顺序, 保证每个tconver(tc)的ch 同时只有一个goroutine在操作 // 这样doConv就可以并发执行 func (dsb *DSBase) doConv(tcCh chan *tconver) { for { tc := <-tcCh dsb.cmu.Lock() cbuf, ok := dsb.cmap[tc.period][tc.insId] dsb.cmu.Unlock() if ok { select { case t := <-tc.ch: if t == nil { break // break select } c := tc.convEx(t) if tc.period == M1 { //testCandle(0, nil, t, c, true) } for _, v := range c { if v == nil { break } if cbuf.leng() == 0 { cbuf.add(v) } last := cbuf.last() if last.Timestamp != v.Timestamp { cbuf.add(v) } else { *last = *v } } default: time.Sleep(time.Microsecond * 1) } } tcCh <- tc // 完成后send back, 其他goroutine recv } } var periodSet = []int{M1, M5, H1, D1} // 从dsb.chM读出实时行情数据, 分别缓存Tick和做Candle周期转换 func (dsb *DSBase) doRead(tcCh chan *tconver) { mapTB := make(map[string]map[int]*tconver) for { m := <-dsb.chM var t *Tick if InsIdPrefix(m.InsId) == Lmax { t = Market2TickByBid(m) } else { t = Market2Tick(m) } // tick 缓存 dsb.tmu.Lock() if _, ok := dsb.tmap[m.InsId]; !ok { dsb.tmap[m.InsId] = &tickBuf{} log.Println("@@@: dsb.tmap", m.InsId) } dsb.tmap[m.InsId].add(t) dsb.tmu.Unlock() // candle 不同周期缓存 dsb.cmu.Lock() for _, period := range periodSet { if _, ok := dsb.cmap[period]; !ok { log.Fatal("_, ok := dsb.cmap[period] error") } if _, ok := dsb.cmap[period][m.InsId]; !ok { dsb.cmap[period][m.InsId] = &candleBuf{} log.Println("@@@: dsb.cmap", m.InsId, period) } } dsb.cmu.Unlock() //把tick数据转到不同周期的TickBuffer做周期转换 if _, ok := mapTB[m.InsId]; !ok { mapTB[m.InsId] = make(map[int]*tconver) for _, period := range periodSet { cg, _ := base.NewCandle(period, 2, nil, 0) tc := &tconver{ch: make(chan *Tick, 1024), cg: cg, period: period, insId: m.InsId} mapTB[m.InsId][period] = tc tcCh <- tc } } // 把tick数据保存到不同周期转换chan中 for _, period := range periodSet { mapTB[m.InsId][period].ch <- t } } } // saveTick 把一个tick数据写入本地文件, 并把文件信息记录数据库 func (dsb *DSBase) saveTick(ts []Tick, insId string, zip bool) (string, error) { if len(ts) == 0 { return "", errors.New("len(ts) == 0") } t0 := ts[0] t := time.Unix(t0.Timestamp/1000, 0) dir := path.Join(dsb.dir, insId, fmt.Sprint(t.Year()), fmt.Sprint(t.Month()), fmt.Sprint(t.Day())) os.MkdirAll(dir, 0777) fname := path.Join(dir, fmt.Sprintf("%d-tick.TK", time.Now().UnixNano())) var err error if zip { err = SaveTicks(dsb.db, "", fname, ts, insId) } else { err = SaveTicksNoZip(dsb.db, "", fname, ts, insId) } if err != nil { return "", err } return fname, nil } // saveCandle 把一个candle数据写入本地文件, 并把文件信息记录数据库 func (dsb *DSBase) saveCandle(insId string, period int, candles []Candle) error { if len(candles) == 0 { return nil } t := time.Unix(candles[0].Timestamp/1000, 0) dir := path.Join(dsb.dir, insId, fmt.Sprint(t.Year())) if period == H1 { dir = path.Join(dir, fmt.Sprint(t.Month())) } else if period < H1 { dir = path.Join(dir, fmt.Sprint(t.Day())) } os.MkdirAll(dir, 0777) bname := fmt.Sprintf("%d-candle.%s", time.Now().UnixNano(), PeriodNameMap[period]) fname := path.Join(dir, bname) if period < H1 { return SaveCandles(dsb.db, "", fname, candles, insId, period) } return SaveH1OrD1(dsb.db, "", fname, candles, insId, period) } func ConvAndSaveCandles(db *MyDB, insId, path string, ticks []Tick) error { refer := path var candles []Candle pa := []int{M1, M5, H1, D1} for _, period := range pa { newpath := strings.Replace(path, "tick.TK", "candle."+PeriodNameMap[period], 1) fi, err := os.Stat(newpath) if err == nil && fi.Size() > 0 { return nil } //var err error if period == M1 { candles, err = convCandles0(ticks, M1) } else { candles, err = convCandles1(candles, period) } if err != nil { log.Println("convAndSaveCandles error:", err) if len(candles) == 0 { return err } } err = SaveCandles(db, refer, newpath, candles, insId, period) if err != nil { return err } } return nil } func convAndSaveCandles(db *MyDB, insId, path string, ticks []Tick) error { refer := path var candles []Candle pa := []int{M1, M5, H1, D1} for _, period := range pa { var err error if period == M1 { candles, err = convCandles0(ticks, M1) } else { candles, err = convCandles1(candles, period) } if err != nil { log.Println("convAndSaveCandles error:", err) if len(candles) == 0 { return err } } newpath := strings.Replace(path, "tick.TK", "candle."+PeriodNameMap[period], 1) err = SaveCandles(db, refer, newpath, candles, insId, period) if err != nil { return err } } return nil } func convCandles0(ticks []Tick, period int) ([]Candle, error) { r := NewTickBuf(ticks) return TickConvCandle(r, period) } func convCandles1(candles []Candle, period int) ([]Candle, error) { r := NewCandleBuf(candles) return ConvPeriod(r, period) }