|
- // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved.
- package market
- // 本文件实现DataSource数据源的tick数据获取下载和保存
- import (
- "errors"
- "fmt"
- "io/ioutil"
- "log"
- "os"
- "path"
- "strings"
- "sync"
- "time"
- "unsafe"
- "tickserver/framework/base"
- "tickserver/framework/event"
- )
- var Debug bool = false // for debug
- type InsName interface {
- GetInsName(insId string) string
- }
- type BufCandleMaker struct {
- candleGenerators []*base.Candle
- ohlcs []base.Ohlc
- tbuf *tickBuf
- cbufs []*CandleBuf
- }
- // 数据源接口
- // lmax, easyforex, oanda, ctp以及未来的macoin等均实现此接口
- type DataSource interface {
- SubIns() *event.Event
- Run()
- GetCacheTicks(insId string) ([]Tick, error)
- GetCacheCandles(insId string, period int) ([]Candle, error)
- GetTimeList(insId, period, beginStr string) ([]string, error)
- SaveAllTicks()
- }
- type DSBase struct {
- db *MyDB
- dir string
- chMs chan *Market
- chMl chan *Market
- chId chan string
- cmu sync.Mutex // 保护camp
- cmap map[int]map[string]*CandleBuf // 缓存最新的Candle
- tmu sync.Mutex // 保护tmap
- tmap map[string]*tickBuf // 缓存最新的Tick
- cmmu sync.Mutex
- bufCandleMakersMap map[string]*BufCandleMaker
- saveCh chan string
- }
- func makeCandleMap() map[int]map[string]*CandleBuf {
- cmap := make(map[int]map[string]*CandleBuf)
- for k, _ := range basePeriodSet {
- cmap[k] = make(map[string]*CandleBuf)
- }
- return cmap
- }
- func NewDsBase(db *MyDB, dir string) *DSBase {
- dsb := &DSBase{
- db: db,
- dir: dir,
- chId: make(chan string, 1),
- chMs: make(chan *Market, 20480),
- chMl: make(chan *Market, 20480),
- cmap: makeCandleMap(),
- tmap: make(map[string]*tickBuf),
- bufCandleMakersMap: make(map[string]*BufCandleMaker),
- saveCh: make(chan string, 20480),
- }
- return dsb
- }
- func (dsb *DSBase) GetTimeList(insId, period, beginStr string) ([]string, error) {
- // log.Println("@@@: DSBase.GetLastCandles:", insId, period, n)
- // defer log.Println("###: DSBase.GetLastCandles:", insId, period, n)
- var year, month, day int
- fmt.Sscanf(beginStr, "%04d%02d%02d", &year, &month, &day)
- var timelist []string
- dir := path.Join(dsb.dir, insId, fmt.Sprint(year))
- for {
- if _, err := os.Stat(dir); os.IsNotExist(err) {
- year++
- if year > time.Now().Year() {
- break
- }
- dir = path.Join(dsb.dir, insId, fmt.Sprint(year))
- continue
- }
- tl := getTimeList(dir, period, beginStr)
- timelist = append(timelist, tl...)
- year++
- if year > time.Now().Year() {
- break
- }
- dir = path.Join(dsb.dir, insId, fmt.Sprint(year))
- }
- return timelist, nil
- }
- func getTimeList(dir, period, beginStr string) []string {
- var timelist []string
- suffix := fmt.Sprintf(".%s.gz", period)
- infos, _ := ioutil.ReadDir(dir)
- for i := 0; i < len(infos); i++ {
- name := infos[i].Name()
- if strings.HasSuffix(name, suffix) {
- time := strings.Split(name, ".")[0]
- if time >= beginStr {
- timelist = append(timelist, time)
- }
- }
- }
- return timelist
- }
- func (dsb *DSBase) GetCacheCandles(insId string, period int) ([]Candle, error) {
- // log.Println("@@@: DSBase.GetLastCandles:", insId, period, n)
- // defer log.Println("###: DSBase.GetLastCandles:", insId, period, n)
- buf, ok := dsb.getBuf(insId, period)
- if !ok {
- msg := fmt.Sprintf("GetLastCandles error: %s insId is NOT in fzm exchange", insId)
- return nil, errors.New(msg)
- }
- buf.Lock()
- defer buf.Unlock()
- return buf.Buf[:], nil
- }
- func (dsb *DSBase) GetCacheTicks(insId string) ([]Tick, error) {
- dsb.tmu.Lock()
- defer dsb.tmu.Unlock()
- buf, ok := dsb.tmap[insId]
- if !ok {
- msg := fmt.Sprintf("GetLastTicks error: %s insId is NOT in fzm exchange", insId)
- return nil, errors.New(msg)
- }
- buf.Lock()
- defer buf.Unlock()
- return buf.buf[:], nil
- }
- func (dsb *DSBase) getBuf(insId string, period int) (*CandleBuf, bool) {
- dsb.cmu.Lock()
- defer dsb.cmu.Unlock()
- bufMap, ok := dsb.cmap[period]
- if !ok {
- return nil, false
- }
- buf, ok := bufMap[insId]
- if !ok {
- return nil, false
- }
- return buf, true
- }
- // 删除不需要的insId
- func (dsb *DSBase) Del(insId string) {
- dsb.chId <- insId
- }
- func (dsb *DSBase) Save(m *Market) {
- select {
- case dsb.chMs <- m:
- default:
- //log.Println("@@@:Save:", m.InsId, m.LastPrice)
- }
- }
- func (dsb *DSBase) SaveL(m *Market) {
- dsb.chMl <- m
- //select {
- //case dsb.chMl <- m:
- //default:
- //log.Println("@@@:Save:", m.InsId, m.LastPrice)
- //}
- }
- type candleInfo struct {
- c Candle
- insId string
- period int
- }
- type readerInfo struct {
- r *candleBuffer
- insId string
- period int
- prev *Candle
- }
- func (dsb *DSBase) NewCandleMaker(insId string) *BufCandleMaker {
- // tick 缓存
- var tbuf *tickBuf
- var ok bool
- dsb.tmu.Lock()
- if tbuf, ok = dsb.tmap[insId]; !ok {
- tbuf = &tickBuf{}
- dsb.tmap[insId] = tbuf
- // log.Println("@@@: dsb.tmap", m.InsId)
- }
- dsb.tmu.Unlock()
- // candle 不同周期缓存
- var cbufs []*CandleBuf
- dsb.cmu.Lock()
- for _, period := range periodSet {
- if _, ok := dsb.cmap[period]; !ok {
- log.Fatal("_, ok := dsb.cmap[period] error")
- }
- var cbuf *CandleBuf
- if cbuf, ok = dsb.cmap[period][insId]; !ok {
- cbuf = &CandleBuf{}
- dsb.cmap[period][insId] = cbuf
- // log.Println("@@@: dsb.cmap", m.InsId, period)
- }
- cbufs = append(cbufs, cbuf)
- }
- dsb.cmu.Unlock()
- candleGenerators := make([]*base.Candle, len(periodSet))
- ohlcs := make([]base.Ohlc, len(periodSet))
- for i, period := range periodSet {
- candleGenerators[i], _ = base.NewCandle(period, 2, nil, 0)
- if strings.HasPrefix(insId, Ctp) {
- candleGenerators[i].Set(base.CANDLE_AUTOCOMPLETE_MAX, 1)
- }
- ohlcs[i] = base.Ohlc{}
- }
- return &BufCandleMaker{
- candleGenerators: candleGenerators[:],
- ohlcs: ohlcs[:],
- tbuf: tbuf,
- cbufs: cbufs,
- }
- }
- func (dsb *DSBase) SaveAllTicks() {
- dsb.tmu.Lock()
- defer dsb.tmu.Unlock()
- for i, v := range dsb.tmap {
- var saveTicks []Tick
- v.Lock()
- saveTicks = v.buf[:]
- v.buf = v.buf[0:0]
- v.Unlock()
- //log.Println("SaveAllTicks saving", i)
- SaveTickEx(dsb.dir, saveTicks, i, false)
- //if err != nil {
- //log.Println(fname, err)
- //}
- }
- }
- func (dsb *DSBase) SaveTicks() {
- for {
- insId := <-dsb.saveCh
- dsb.tmu.Lock()
- tbuf, ok := dsb.tmap[insId]
- dsb.tmu.Unlock()
- if ok {
- var saveTicks []Tick
- tbuf.Lock()
- if len(tbuf.buf) >= 2000 {
- saveTicks = tbuf.buf[:1000]
- tbuf.buf = tbuf.buf[1000:]
- }
- tbuf.Unlock()
- SaveTickEx(dsb.dir, saveTicks, insId, false)
- //if err != nil {
- //log.Println(fname, err)
- //}
- }
- }
- }
- func (dsb *DSBase) DoReadEx() error {
- go dsb.SaveTicks()
- for {
- var bLoaded bool
- var m *Market
- select {
- case m = <-dsb.chMs:
- bLoaded = false
- case m = <-dsb.chMl:
- bLoaded = true
- }
- var t *Tick
- if InsIdPrefix(m.InsId) == Lmax && !bLoaded {
- t = Market2TickByBid(m)
- } else {
- t = Market2Tick(m)
- }
- bcm, ok := dsb.bufCandleMakersMap[m.InsId]
- if !ok {
- bcm = dsb.NewCandleMaker(m.InsId)
- dsb.bufCandleMakersMap[m.InsId] = bcm
- }
- if !bLoaded {
- bSave := bcm.tbuf.add(t)
- if bSave {
- dsb.saveCh <- m.InsId
- }
- }
- for i, candleGenerator := range bcm.candleGenerators {
- tg := Tk2Tg(*t)
- num := candleGenerator.UpdateTick((*base.Tick)(unsafe.Pointer(&tg)))
- var candles []Candle
- if num == 0 {
- candleGenerator.Next(&bcm.ohlcs[i])
- ohlcGo := bcm.ohlcs[i].ToGOStruct()
- candles = append(candles, OhlcGo2Candle(ohlcGo))
- } else if num > 0 {
- for j := 0; j < num; j++ {
- candleGenerator.Next(&bcm.ohlcs[i])
- ohlcGo := bcm.ohlcs[i].ToGOStruct()
- candles = append(candles, OhlcGo2Candle(ohlcGo))
- }
- } else {
- //log.Println("tick error.")
- }
- for _, v := range candles {
- last := bcm.cbufs[i].Last()
- if last != nil && last.Timestamp == v.Timestamp {
- *last = v
- } else {
- bcm.cbufs[i].add(&v, periodSet[i])
- }
- }
- }
- }
- return nil
- }
- type tbuf struct {
- tb *tickBuf
- insId string
- }
- type cbuf struct {
- cb *CandleBuf
- insId string
- period int
- }
- var periodSet = []int{M1, M5, H1, D1}
- func SaveCandlesEx(dataDir, insId string, candles []Candle, period int, bTruncate bool) (string, error) {
- if len(candles) == 0 {
- return "", nil
- }
- t := time.Unix(candles[0].Timestamp/1000, 0)
- dir := path.Join(dataDir, insId)
- if period < D1 {
- dir = path.Join(dir, fmt.Sprint(t.Year()))
- }
- os.MkdirAll(dir, 0777)
- var bname string
- if period < D1 {
- bname = fmt.Sprintf("%04d%02d%02d.%s.gz", t.Year(), t.Month(), t.Day(), PeriodNameMap[period])
- } else {
- bname = fmt.Sprintf("%s.gz", PeriodNameMap[period])
- }
- fname := path.Join(dir, bname)
- if !bTruncate {
- candles, _ = combinEx(fname, candles)
- }
- // 新建并写入文件
- w, err := os.Create(fname)
- if err != nil {
- return "", errors.New("SaveCandles os.Create error:" + err.Error())
- }
- defer w.Close()
- err = ZipCBuf(w, candles)
- if err != nil {
- return "", errors.New("SaveCandles ZipCBuf error:" + err.Error())
- }
- return fname, nil
- }
- func SaveCandlesTmp(dataDir, insId string, candles []Candle, period int, bTruncate bool) (string, error) {
- if period == D1 {
- return saveCandlesTmp(dataDir, insId, candles, period, bTruncate)
- }
- if len(candles) == 0 {
- return "", nil
- }
- oneDay := int64(1000 * 3600 * 24)
- baseT := candles[0].Timestamp / oneDay
- begin := 0
- for k, v := range candles {
- tmpBaseT := v.Timestamp / oneDay
- if tmpBaseT != baseT {
- saveCandlesTmp(dataDir, insId, candles[begin:k], period, bTruncate)
- //log.Println(fname, err)
- begin = k
- baseT = tmpBaseT
- }
- }
- return saveCandlesTmp(dataDir, insId, candles[begin:], period, bTruncate)
- }
- func saveCandlesTmp(dataDir, insId string, candles []Candle, period int, bTruncate bool) (string, error) {
- if len(candles) == 0 {
- return "", nil
- }
- t := time.Unix(candles[0].Timestamp/1000, 0).UTC()
- dir := path.Join(dataDir, insId)
- if period < D1 {
- dir = path.Join(dir, fmt.Sprint(t.Year()))
- }
- os.MkdirAll(dir, 0777)
- var bname string
- if period < D1 {
- bname = fmt.Sprintf("%04d%02d%02d.%s.gz", t.Year(), t.Month(), t.Day(), PeriodNameMap[period])
- } else {
- bname = fmt.Sprintf("%s.gz", PeriodNameMap[period])
- }
- fname := path.Join(dir, bname)
- if period != D1 {
- tmpfname := fname
- fname = tmpfname + ".tmp"
- if _, err := os.Stat(fname); os.IsNotExist(err) {
- if _, err := os.Stat(tmpfname); err == nil {
- for err = os.Rename(tmpfname, fname); err != nil; err = os.Rename(tmpfname, fname) {
- time.Sleep(time.Second)
- }
- }
- }
- }
- if !bTruncate {
- candles, _ = combinEx(fname, candles)
- }
- // 新建并写入文件
- w, err := os.Create(fname)
- if err != nil {
- return "", errors.New("SaveCandles os.Create error:" + err.Error())
- }
- defer w.Close()
- err = ZipCBuf(w, candles)
- if err != nil {
- return "", errors.New("SaveCandles ZipCBuf error:" + err.Error())
- }
- //if period != D1 {
- //if _, err := os.Stat(tmpfname); os.IsNotExist(err) {
- //if _, err := os.Stat(fname); err == nil {
- //for err = os.Rename(fname, tmpfname); err != nil; err = os.Rename(fname, tmpfname) {
- //time.Sleep(time.Second)
- //}
- //}
- //}
- //}
- return fname, nil
- }
- func convCandles0(ticks []Tick, insId string, period int) ([]Candle, error) {
- r := NewTickBuf(ticks)
- return TickConvCandle(r, insId, period)
- }
- func convCandles1(candles []Candle, insId string, period int) ([]Candle, error) {
- r := NewCandleBuf(candles)
- return ConvPeriod(r, insId, period)
- }
|