123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529 |
- // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved.
- package market
- // 本文件实现DataSource数据源的tick数据获取下载和保存
- import (
- "errors"
- "fmt"
- "log"
- "os"
- "path"
- "strings"
- "sync"
- "time"
- "dev.33.cn/framework/base"
- "dev.33.cn/framework/event"
- )
- var Debug bool = false // for debug
- type InsName interface {
- GetInsName(insId string) string
- }
- // 数据源接口
- // lmax, easyforex, oanda, ctp以及未来的macoin等均实现此接口
- type DataSource interface {
- SubIns() *event.Event
- Run()
- GetLastTicks(insId string, n int) ([]Tick, error)
- GetLastCandles(insId string, period, n int) ([]Candle, error)
- }
- type DSBase struct {
- db *MyDB
- dir string
- chM chan *Market
- chId chan string
- cmu sync.Mutex // 保护camp
- cmap map[int]map[string]*candleBuf // 缓存最新的Candle
- tmu sync.Mutex // 保护tmap
- tmap map[string]*tickBuf // 缓存最新的Tick
- }
- func makeCandleMap() map[int]map[string]*candleBuf {
- cmap := make(map[int]map[string]*candleBuf)
- for k, _ := range basePeriodSet {
- cmap[k] = make(map[string]*candleBuf)
- }
- return cmap
- }
- func NewDsBase(db *MyDB, dir string) *DSBase {
- dsb := &DSBase{
- db: db,
- dir: dir,
- chId: make(chan string, 1),
- chM: make(chan *Market, 10240),
- cmap: makeCandleMap(),
- tmap: make(map[string]*tickBuf),
- }
- return dsb
- }
- func (dsb *DSBase) GetLastCandles(insId string, period, n int) ([]Candle, error) {
- log.Println("@@@: DSBase.GetLastCandles:", insId, period, n)
- defer log.Println("###: DSBase.GetLastCandles:", insId, period, n)
- buf, ok := dsb.getBuf(insId, period)
- if !ok {
- msg := fmt.Sprintf("GetLastCandles error: %s insId is NOT in fzm exchange", insId)
- return nil, errors.New(msg)
- }
- buf.Lock()
- defer buf.Unlock()
- p := len(buf.buf) - n
- if p < 0 {
- p = 0
- }
- return buf.buf[p:], nil
- }
- func (dsb *DSBase) GetLastTicks(insId string, n int) ([]Tick, error) {
- dsb.tmu.Lock()
- defer dsb.tmu.Unlock()
- buf, ok := dsb.tmap[insId]
- if !ok {
- msg := fmt.Sprintf("GetLastTicks error: %s insId is NOT in fzm exchange", insId)
- return nil, errors.New(msg)
- }
- buf.Lock()
- defer buf.Unlock()
- p := len(buf.buf) - n
- if p < 0 {
- p = 0
- }
- return buf.buf[p:], nil
- }
- func (dsb *DSBase) getBuf(insId string, period int) (*candleBuf, bool) {
- dsb.cmu.Lock()
- defer dsb.cmu.Unlock()
- bufMap, ok := dsb.cmap[period]
- if !ok {
- return nil, false
- }
- buf, ok := bufMap[insId]
- if !ok {
- return nil, false
- }
- return buf, true
- }
- // 删除不需要的insId
- func (dsb *DSBase) Del(insId string) {
- dsb.chId <- insId
- }
- func (dsb *DSBase) Save(m *Market) {
- select {
- case dsb.chM <- m:
- default:
- log.Println("@@@:Save:", m.InsId, m.LastPrice)
- }
- }
- type candleInfo struct {
- c Candle
- insId string
- period int
- }
- type readerInfo struct {
- r *candleBuffer
- insId string
- period int
- prev *Candle
- }
- // 每小时检查一次, 删除超过n数据
- func (dsb *DSBase) saveCheck() {
- ticker := time.Tick(time.Hour)
- for _ = range ticker {
- dsb.cmu.Lock()
- for _, m := range dsb.cmap {
- for _, buf := range m {
- n := 1440 // M1一天的, 其他周期不论
- buf.Lock()
- l := len(buf.buf)
- if l > n {
- buf.buf = buf.buf[l-n:]
- }
- buf.Unlock()
- }
- }
- dsb.cmu.Unlock()
- dsb.tmu.Lock()
- for insId, buf := range dsb.tmap {
- dsName := InsIdPrefix(insId)
- n := 0
- switch dsName {
- case Lmax, Saxo:
- n = 3600 * 10 * 6 // 大约缓存6小时
- case Ctp, Dzh:
- n = 3600 * 2 * 3 // 大约缓存3小时
- default:
- n = 8640 // easyforex等 大约1天
- }
- buf.Lock()
- l := len(buf.buf)
- if l > n {
- buf.buf = buf.buf[l-n:]
- }
- buf.Unlock()
- }
- dsb.tmu.Unlock()
- }
- }
- // 保存到内存并保存为文件
- func (dsb *DSBase) RunSave(n int) {
- log.Println("@@@:RunSave:", n)
- tcCh := make(chan *tconver, 40960) // 足够大,包括所有的品种
- go dsb.doRead(tcCh)
- for i := 0; i < n; i++ {
- go dsb.doConv(tcCh)
- }
- // 保存数据到文件
- if dsb.db != nil {
- if Debug { // 测试时每小时保存一次
- ticker := time.Tick(time.Hour)
- for _ = range ticker {
- dsb.doSave()
- }
- } else { // 每天定时保存
- ticker := time.Tick(time.Second * 30)
- for t := range ticker {
- if t.Hour() == 5 && t.Minute() == 0 { // 5:00点时保存
- dsb.doSave()
- dsb.saveCheck()
- }
- }
- }
- } else {
- dsb.saveCheck()
- }
- }
- type tbuf struct {
- tb *tickBuf
- insId string
- }
- type cbuf struct {
- cb *candleBuf
- insId string
- period int
- }
- // 把缓存数据写入文件, 所有K线周期数据都用缓存
- func (dsb *DSBase) doSave0() {
- tCh := make(chan tbuf, 8192)
- cCh := make(chan cbuf, 8192*4)
- for i := 0; i < 4; i++ {
- go func() {
- for {
- select {
- case t := <-tCh:
- tb := t.tb
- tb.Lock()
- dsb.saveTick(tb.buf, t.insId, true)
- tb.Unlock()
- case c := <-cCh:
- cb := c.cb
- cb.Lock()
- dsb.saveCandle(c.insId, c.period, cb.buf)
- cb.Unlock()
- }
- }
- }()
- }
- dsb.tmu.Lock()
- for insId, tb := range dsb.tmap {
- tCh <- tbuf{tb, insId}
- }
- dsb.tmu.Unlock()
- close(tCh)
- dsb.cmu.Lock()
- for period, m := range dsb.cmap {
- for insId, cb := range m {
- cCh <- cbuf{cb, insId, period}
- }
- }
- dsb.cmu.Unlock()
- close(cCh)
- }
- // 把缓存数据写入文件, K线数据使用tick转换
- func (dsb *DSBase) doSave() {
- ch := make(chan tbuf, 8192)
- for i := 0; i < 4; i++ {
- go func() {
- for t := range ch {
- tb := t.tb
- insId := t.insId
- fname, err := dsb.saveTick(tb.buf, insId, true)
- if err == nil {
- convAndSaveCandles(dsb.db, insId, fname, tb.buf)
- }
- }
- }()
- }
- dsb.tmu.Lock()
- for insId, tb := range dsb.tmap {
- ch <- tbuf{tb, insId}
- }
- dsb.tmu.Unlock()
- close(ch)
- }
- // 从chR中读出Candle, 周期转换, 存储到chC中
- func (dsb *DSBase) doConv0(chR chan *readerInfo) {
- for {
- ri := <-chR
- select {
- case ce := <-ri.r.ch:
- c, err := ce.candle, ce.err
- if err != nil {
- log.Println(err)
- break // break select
- }
- dsb.cmu.Lock()
- cbuf, ok := dsb.cmap[ri.period][ri.insId]
- dsb.cmu.Unlock()
- if !ok {
- break // break select
- }
- if cbuf.leng() == 0 {
- cbuf.add(c)
- }
- last := cbuf.last()
- if last.Timestamp != c.Timestamp {
- cbuf.add(c)
- } else {
- *last = *c
- }
- default:
- time.Sleep(time.Millisecond * 1)
- }
- chR <- ri
- }
- }
- // 从chR中读出Candle, 周期转换, 存储到chC中
- // tc.ch 保证了tick的顺序, 保证每个tconver(tc)的ch 同时只有一个goroutine在操作
- // 这样doConv就可以并发执行
- func (dsb *DSBase) doConv(tcCh chan *tconver) {
- for {
- tc := <-tcCh
- dsb.cmu.Lock()
- cbuf, ok := dsb.cmap[tc.period][tc.insId]
- dsb.cmu.Unlock()
- if ok {
- select {
- case t := <-tc.ch:
- if t == nil {
- break // break select
- }
- c := tc.convEx(t)
- if tc.period == M1 {
- //testCandle(0, nil, t, c, true)
- }
- for _, v := range c {
- if v == nil {
- break
- }
- if cbuf.leng() == 0 {
- cbuf.add(v)
- }
- last := cbuf.last()
- if last.Timestamp != v.Timestamp {
- cbuf.add(v)
- } else {
- *last = *v
- }
- }
- default:
- time.Sleep(time.Microsecond * 1)
- }
- }
- tcCh <- tc // 完成后send back, 其他goroutine recv
- }
- }
- var periodSet = []int{M1, M5, H1, D1}
- // 从dsb.chM读出实时行情数据, 分别缓存Tick和做Candle周期转换
- func (dsb *DSBase) doRead(tcCh chan *tconver) {
- mapTB := make(map[string]map[int]*tconver)
- for {
- m := <-dsb.chM
- var t *Tick
- if InsIdPrefix(m.InsId) == Lmax {
- t = Market2TickByBid(m)
- } else {
- t = Market2Tick(m)
- }
- // tick 缓存
- dsb.tmu.Lock()
- if _, ok := dsb.tmap[m.InsId]; !ok {
- dsb.tmap[m.InsId] = &tickBuf{}
- log.Println("@@@: dsb.tmap", m.InsId)
- }
- dsb.tmap[m.InsId].add(t)
- dsb.tmu.Unlock()
- // candle 不同周期缓存
- dsb.cmu.Lock()
- for _, period := range periodSet {
- if _, ok := dsb.cmap[period]; !ok {
- log.Fatal("_, ok := dsb.cmap[period] error")
- }
- if _, ok := dsb.cmap[period][m.InsId]; !ok {
- dsb.cmap[period][m.InsId] = &candleBuf{}
- log.Println("@@@: dsb.cmap", m.InsId, period)
- }
- }
- dsb.cmu.Unlock()
- //把tick数据转到不同周期的TickBuffer做周期转换
- if _, ok := mapTB[m.InsId]; !ok {
- mapTB[m.InsId] = make(map[int]*tconver)
- for _, period := range periodSet {
- cg, _ := base.NewCandle(period, 2, nil, 0)
- tc := &tconver{ch: make(chan *Tick, 1024), cg: cg, period: period, insId: m.InsId}
- mapTB[m.InsId][period] = tc
- tcCh <- tc
- }
- }
- // 把tick数据保存到不同周期转换chan中
- for _, period := range periodSet {
- mapTB[m.InsId][period].ch <- t
- }
- }
- }
- // saveTick 把一个tick数据写入本地文件, 并把文件信息记录数据库
- func (dsb *DSBase) saveTick(ts []Tick, insId string, zip bool) (string, error) {
- if len(ts) == 0 {
- return "", errors.New("len(ts) == 0")
- }
- t0 := ts[0]
- t := time.Unix(t0.Timestamp/1000, 0)
- dir := path.Join(dsb.dir, insId, fmt.Sprint(t.Year()), fmt.Sprint(t.Month()), fmt.Sprint(t.Day()))
- os.MkdirAll(dir, 0777)
- fname := path.Join(dir, fmt.Sprintf("%d-tick.TK", time.Now().UnixNano()))
- var err error
- if zip {
- err = SaveTicks(dsb.db, "", fname, ts, insId)
- } else {
- err = SaveTicksNoZip(dsb.db, "", fname, ts, insId)
- }
- if err != nil {
- return "", err
- }
- return fname, nil
- }
- // saveCandle 把一个candle数据写入本地文件, 并把文件信息记录数据库
- func (dsb *DSBase) saveCandle(insId string, period int, candles []Candle) error {
- if len(candles) == 0 {
- return nil
- }
- t := time.Unix(candles[0].Timestamp/1000, 0)
- dir := path.Join(dsb.dir, insId, fmt.Sprint(t.Year()))
- if period == H1 {
- dir = path.Join(dir, fmt.Sprint(t.Month()))
- } else if period < H1 {
- dir = path.Join(dir, fmt.Sprint(t.Day()))
- }
- os.MkdirAll(dir, 0777)
- bname := fmt.Sprintf("%d-candle.%s", time.Now().UnixNano(), PeriodNameMap[period])
- fname := path.Join(dir, bname)
- if period < H1 {
- return SaveCandles(dsb.db, "", fname, candles, insId, period)
- }
- return SaveH1OrD1(dsb.db, "", fname, candles, insId, period)
- }
- func ConvAndSaveCandles(db *MyDB, insId, path string, ticks []Tick) error {
- refer := path
- var candles []Candle
- pa := []int{M1, M5, H1, D1}
- for _, period := range pa {
- newpath := strings.Replace(path, "tick.TK", "candle."+PeriodNameMap[period], 1)
- fi, err := os.Stat(newpath)
- if err == nil && fi.Size() > 0 {
- return nil
- }
- //var err error
- if period == M1 {
- candles, err = convCandles0(ticks, M1)
- } else {
- candles, err = convCandles1(candles, period)
- }
- if err != nil {
- log.Println("convAndSaveCandles error:", err)
- if len(candles) == 0 {
- return err
- }
- }
- err = SaveCandles(db, refer, newpath, candles, insId, period)
- if err != nil {
- return err
- }
- }
- return nil
- }
- func convAndSaveCandles(db *MyDB, insId, path string, ticks []Tick) error {
- refer := path
- var candles []Candle
- pa := []int{M1, M5, H1, D1}
- for _, period := range pa {
- var err error
- if period == M1 {
- candles, err = convCandles0(ticks, M1)
- } else {
- candles, err = convCandles1(candles, period)
- }
- if err != nil {
- log.Println("convAndSaveCandles error:", err)
- if len(candles) == 0 {
- return err
- }
- }
- newpath := strings.Replace(path, "tick.TK", "candle."+PeriodNameMap[period], 1)
- err = SaveCandles(db, refer, newpath, candles, insId, period)
- if err != nil {
- return err
- }
- }
- return nil
- }
- func convCandles0(ticks []Tick, period int) ([]Candle, error) {
- r := NewTickBuf(ticks)
- return TickConvCandle(r, period)
- }
- func convCandles1(candles []Candle, period int) ([]Candle, error) {
- r := NewCandleBuf(candles)
- return ConvPeriod(r, period)
- }
|