// Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved. package market // 本文件实现DataSource数据源的tick数据获取下载和保存 import ( "errors" "fmt" "io/ioutil" "log" "os" "path" "strings" "sync" "time" "unsafe" "tickserver/framework/base" "tickserver/framework/event" ) var Debug bool = false // for debug type InsName interface { GetInsName(insId string) string } type BufCandleMaker struct { candleGenerators []*base.Candle ohlcs []base.Ohlc tbuf *tickBuf cbufs []*CandleBuf } // 数据源接口 // lmax, easyforex, oanda, ctp以及未来的macoin等均实现此接口 type DataSource interface { SubIns() *event.Event Run() GetCacheTicks(insId string) ([]Tick, error) GetCacheCandles(insId string, period int) ([]Candle, error) GetTimeList(insId, period, beginStr string) ([]string, error) SaveAllTicks() } type DSBase struct { db *MyDB dir string chMs chan *Market chMl chan *Market chId chan string cmu sync.Mutex // 保护camp cmap map[int]map[string]*CandleBuf // 缓存最新的Candle tmu sync.Mutex // 保护tmap tmap map[string]*tickBuf // 缓存最新的Tick cmmu sync.Mutex bufCandleMakersMap map[string]*BufCandleMaker saveCh chan string } func makeCandleMap() map[int]map[string]*CandleBuf { cmap := make(map[int]map[string]*CandleBuf) for k, _ := range basePeriodSet { cmap[k] = make(map[string]*CandleBuf) } return cmap } func NewDsBase(db *MyDB, dir string) *DSBase { dsb := &DSBase{ db: db, dir: dir, chId: make(chan string, 1), chMs: make(chan *Market, 20480), chMl: make(chan *Market, 20480), cmap: makeCandleMap(), tmap: make(map[string]*tickBuf), bufCandleMakersMap: make(map[string]*BufCandleMaker), saveCh: make(chan string, 20480), } return dsb } func (dsb *DSBase) GetTimeList(insId, period, beginStr string) ([]string, error) { // log.Println("@@@: DSBase.GetLastCandles:", insId, period, n) // defer log.Println("###: DSBase.GetLastCandles:", insId, period, n) var year, month, day int fmt.Sscanf(beginStr, "%04d%02d%02d", &year, &month, &day) var timelist []string dir := path.Join(dsb.dir, insId, fmt.Sprint(year)) for { if _, err := os.Stat(dir); os.IsNotExist(err) { year++ if year > time.Now().Year() { break } dir = path.Join(dsb.dir, insId, fmt.Sprint(year)) continue } tl := getTimeList(dir, period, beginStr) timelist = append(timelist, tl...) year++ if year > time.Now().Year() { break } dir = path.Join(dsb.dir, insId, fmt.Sprint(year)) } return timelist, nil } func getTimeList(dir, period, beginStr string) []string { var timelist []string suffix := fmt.Sprintf(".%s.gz", period) infos, _ := ioutil.ReadDir(dir) for i := 0; i < len(infos); i++ { name := infos[i].Name() if strings.HasSuffix(name, suffix) { time := strings.Split(name, ".")[0] if time >= beginStr { timelist = append(timelist, time) } } } return timelist } func (dsb *DSBase) GetCacheCandles(insId string, period int) ([]Candle, error) { // log.Println("@@@: DSBase.GetLastCandles:", insId, period, n) // defer log.Println("###: DSBase.GetLastCandles:", insId, period, n) buf, ok := dsb.getBuf(insId, period) if !ok { msg := fmt.Sprintf("GetLastCandles error: %s insId is NOT in fzm exchange", insId) return nil, errors.New(msg) } buf.Lock() defer buf.Unlock() return buf.Buf[:], nil } func (dsb *DSBase) GetCacheTicks(insId string) ([]Tick, error) { dsb.tmu.Lock() defer dsb.tmu.Unlock() buf, ok := dsb.tmap[insId] if !ok { msg := fmt.Sprintf("GetLastTicks error: %s insId is NOT in fzm exchange", insId) return nil, errors.New(msg) } buf.Lock() defer buf.Unlock() return buf.buf[:], nil } func (dsb *DSBase) getBuf(insId string, period int) (*CandleBuf, bool) { dsb.cmu.Lock() defer dsb.cmu.Unlock() bufMap, ok := dsb.cmap[period] if !ok { return nil, false } buf, ok := bufMap[insId] if !ok { return nil, false } return buf, true } // 删除不需要的insId func (dsb *DSBase) Del(insId string) { dsb.chId <- insId } func (dsb *DSBase) Save(m *Market) { select { case dsb.chMs <- m: default: //log.Println("@@@:Save:", m.InsId, m.LastPrice) } } func (dsb *DSBase) SaveL(m *Market) { dsb.chMl <- m //select { //case dsb.chMl <- m: //default: //log.Println("@@@:Save:", m.InsId, m.LastPrice) //} } type candleInfo struct { c Candle insId string period int } type readerInfo struct { r *candleBuffer insId string period int prev *Candle } func (dsb *DSBase) NewCandleMaker(insId string) *BufCandleMaker { // tick 缓存 var tbuf *tickBuf var ok bool dsb.tmu.Lock() if tbuf, ok = dsb.tmap[insId]; !ok { tbuf = &tickBuf{} dsb.tmap[insId] = tbuf // log.Println("@@@: dsb.tmap", m.InsId) } dsb.tmu.Unlock() // candle 不同周期缓存 var cbufs []*CandleBuf dsb.cmu.Lock() for _, period := range periodSet { if _, ok := dsb.cmap[period]; !ok { log.Fatal("_, ok := dsb.cmap[period] error") } var cbuf *CandleBuf if cbuf, ok = dsb.cmap[period][insId]; !ok { cbuf = &CandleBuf{} dsb.cmap[period][insId] = cbuf // log.Println("@@@: dsb.cmap", m.InsId, period) } cbufs = append(cbufs, cbuf) } dsb.cmu.Unlock() candleGenerators := make([]*base.Candle, len(periodSet)) ohlcs := make([]base.Ohlc, len(periodSet)) for i, period := range periodSet { candleGenerators[i], _ = base.NewCandle(period, 2, nil, 0) if strings.HasPrefix(insId, Ctp) { candleGenerators[i].Set(base.CANDLE_AUTOCOMPLETE_MAX, 1) } ohlcs[i] = base.Ohlc{} } return &BufCandleMaker{ candleGenerators: candleGenerators[:], ohlcs: ohlcs[:], tbuf: tbuf, cbufs: cbufs, } } func (dsb *DSBase) SaveAllTicks() { dsb.tmu.Lock() defer dsb.tmu.Unlock() for i, v := range dsb.tmap { var saveTicks []Tick v.Lock() saveTicks = v.buf[:] v.buf = v.buf[0:0] v.Unlock() //log.Println("SaveAllTicks saving", i) SaveTickEx(dsb.dir, saveTicks, i, false) //if err != nil { //log.Println(fname, err) //} } } func (dsb *DSBase) SaveTicks() { for { insId := <-dsb.saveCh dsb.tmu.Lock() tbuf, ok := dsb.tmap[insId] dsb.tmu.Unlock() if ok { var saveTicks []Tick tbuf.Lock() if len(tbuf.buf) >= 2000 { saveTicks = tbuf.buf[:1000] tbuf.buf = tbuf.buf[1000:] } tbuf.Unlock() SaveTickEx(dsb.dir, saveTicks, insId, false) //if err != nil { //log.Println(fname, err) //} } } } func (dsb *DSBase) DoReadEx() error { go dsb.SaveTicks() for { var bLoaded bool var m *Market select { case m = <-dsb.chMs: bLoaded = false case m = <-dsb.chMl: bLoaded = true } var t *Tick if InsIdPrefix(m.InsId) == Lmax && !bLoaded { t = Market2TickByBid(m) } else { t = Market2Tick(m) } bcm, ok := dsb.bufCandleMakersMap[m.InsId] if !ok { bcm = dsb.NewCandleMaker(m.InsId) dsb.bufCandleMakersMap[m.InsId] = bcm } if !bLoaded { bSave := bcm.tbuf.add(t) if bSave { dsb.saveCh <- m.InsId } } for i, candleGenerator := range bcm.candleGenerators { tg := Tk2Tg(*t) num := candleGenerator.UpdateTick((*base.Tick)(unsafe.Pointer(&tg))) var candles []Candle if num == 0 { candleGenerator.Next(&bcm.ohlcs[i]) ohlcGo := bcm.ohlcs[i].ToGOStruct() candles = append(candles, OhlcGo2Candle(ohlcGo)) } else if num > 0 { for j := 0; j < num; j++ { candleGenerator.Next(&bcm.ohlcs[i]) ohlcGo := bcm.ohlcs[i].ToGOStruct() candles = append(candles, OhlcGo2Candle(ohlcGo)) } } else { //log.Println("tick error.") } for _, v := range candles { last := bcm.cbufs[i].Last() if last != nil && last.Timestamp == v.Timestamp { *last = v } else { bcm.cbufs[i].add(&v, periodSet[i]) } } } } return nil } type tbuf struct { tb *tickBuf insId string } type cbuf struct { cb *CandleBuf insId string period int } var periodSet = []int{M1, M5, H1, D1} func SaveCandlesEx(dataDir, insId string, candles []Candle, period int, bTruncate bool) (string, error) { if len(candles) == 0 { return "", nil } t := time.Unix(candles[0].Timestamp/1000, 0) dir := path.Join(dataDir, insId) if period < D1 { dir = path.Join(dir, fmt.Sprint(t.Year())) } os.MkdirAll(dir, 0777) var bname string if period < D1 { bname = fmt.Sprintf("%04d%02d%02d.%s.gz", t.Year(), t.Month(), t.Day(), PeriodNameMap[period]) } else { bname = fmt.Sprintf("%s.gz", PeriodNameMap[period]) } fname := path.Join(dir, bname) if !bTruncate { candles, _ = combinEx(fname, candles) } // 新建并写入文件 w, err := os.Create(fname) if err != nil { return "", errors.New("SaveCandles os.Create error:" + err.Error()) } defer w.Close() err = ZipCBuf(w, candles) if err != nil { return "", errors.New("SaveCandles ZipCBuf error:" + err.Error()) } return fname, nil } func SaveCandlesTmp(dataDir, insId string, candles []Candle, period int, bTruncate bool) (string, error) { if period == D1 { return saveCandlesTmp(dataDir, insId, candles, period, bTruncate) } if len(candles) == 0 { return "", nil } oneDay := int64(1000 * 3600 * 24) baseT := candles[0].Timestamp / oneDay begin := 0 for k, v := range candles { tmpBaseT := v.Timestamp / oneDay if tmpBaseT != baseT { saveCandlesTmp(dataDir, insId, candles[begin:k], period, bTruncate) //log.Println(fname, err) begin = k baseT = tmpBaseT } } return saveCandlesTmp(dataDir, insId, candles[begin:], period, bTruncate) } func saveCandlesTmp(dataDir, insId string, candles []Candle, period int, bTruncate bool) (string, error) { if len(candles) == 0 { return "", nil } t := time.Unix(candles[0].Timestamp/1000, 0).UTC() dir := path.Join(dataDir, insId) if period < D1 { dir = path.Join(dir, fmt.Sprint(t.Year())) } os.MkdirAll(dir, 0777) var bname string if period < D1 { bname = fmt.Sprintf("%04d%02d%02d.%s.gz", t.Year(), t.Month(), t.Day(), PeriodNameMap[period]) } else { bname = fmt.Sprintf("%s.gz", PeriodNameMap[period]) } fname := path.Join(dir, bname) if period != D1 { tmpfname := fname fname = tmpfname + ".tmp" if _, err := os.Stat(fname); os.IsNotExist(err) { if _, err := os.Stat(tmpfname); err == nil { for err = os.Rename(tmpfname, fname); err != nil; err = os.Rename(tmpfname, fname) { time.Sleep(time.Second) } } } } if !bTruncate { candles, _ = combinEx(fname, candles) } // 新建并写入文件 w, err := os.Create(fname) if err != nil { return "", errors.New("SaveCandles os.Create error:" + err.Error()) } defer w.Close() err = ZipCBuf(w, candles) if err != nil { return "", errors.New("SaveCandles ZipCBuf error:" + err.Error()) } //if period != D1 { //if _, err := os.Stat(tmpfname); os.IsNotExist(err) { //if _, err := os.Stat(fname); err == nil { //for err = os.Rename(fname, tmpfname); err != nil; err = os.Rename(fname, tmpfname) { //time.Sleep(time.Second) //} //} //} //} return fname, nil } func convCandles0(ticks []Tick, insId string, period int) ([]Candle, error) { r := NewTickBuf(ticks) return TickConvCandle(r, insId, period) } func convCandles1(candles []Candle, insId string, period int) ([]Candle, error) { r := NewCandleBuf(candles) return ConvPeriod(r, insId, period) }