package main //读取文件,生成 base.TickGo //Open(dir, instrumentId) //Read() (*base.TickGo, error) import "fmt" import "io" import "os" import "lmaxapi/base" import "log" import "strings" import "compress/gzip" import "encoding/csv" import "strconv" import "errors" import "lmaxapi/markinfo" import "flag" import "runtime/pprof" //import "runtime" import "time" type TickRead struct { tickch chan *base.TickGo errch chan error } func (tr *TickRead) Read() (*base.TickGo, error) { tick := <-tr.tickch if tick == nil { return nil, <-tr.errch } return tick, nil } type CandleRead struct { candlech chan *base.OhlcGo errch chan error } func (tr *CandleRead) Read() (*base.OhlcGo, error) { candle := <-tr.candlech if candle == nil { return nil, <-tr.errch } return candle, nil } var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") func main() { //runtime.GOMAXPROCS(4) s := time.Now() defer func() { log.Println(time.Now().Sub(s)) }() flag.Parse() if *cpuprofile != "" { f, err := os.Create(*cpuprofile) if err != nil { log.Fatal(err) } pprof.StartCPUProfile(f) defer func() { log.Println("stop pprof.") pprof.StopCPUProfile() }() } reader, err := m1Reader() if err != nil { log.Println(err) return } symbolId, err := markinfo.BookIdToSymbolId(4001) if err != nil { log.Println(err) return } //货币对 //周期 //初始K线(已经生成了部分k线, 在这个基础上继续生成) //时区,默认是从 GMT+0 到 GMT+0 //数据源,支持两种接口:base.TickReader base.CandleReader gen, err := base.NewCandleGenerate(symbolId, base.H1, nil, nil, reader) if err != nil { log.Println(err) return } //一直读取 var prev *base.OhlcGo for { //这里会输出所有的k线生成的过程,每次tick跳动,就会生成一个ohlc ohlc, err := gen.Read() if err == io.EOF { break } if err != nil { log.Println(err) break } if prev == nil { prev = ohlc } if prev.Time != ohlc.Time { //产生新的K线 // log.Println(prev) } prev = ohlc } log.Println("end") } func m1Reader() (*CandleRead, error) { reader, err := OpenDir("./lmax", 4001) if err != nil { return nil, err } symbolId, err := markinfo.BookIdToSymbolId(4001) if err != nil { return nil, err } //货币对 //周期 //初始K线(已经生成了部分k线, 在这个基础上继续生成) //时区,默认是从 GMT+0 到 GMT+0 //数据源,支持两种接口:base.TickReader base.CandleReader gen, err := base.NewCandleGenerate(symbolId, base.S15, nil, nil, reader) if err != nil { return nil, err } ch := make(chan *base.OhlcGo, 1024) errch := make(chan error) reader2 := &CandleRead{} reader2.candlech = ch reader2.errch = errch //一直读取 go func() { var prev *base.OhlcGo for { //这里会输出所有的k线生成的过程,每次tick跳动,就会生成一个ohlc ohlc, err := gen.Read() if err == io.EOF { break } if err != nil { log.Println(err) break } if prev == nil { prev = ohlc reader2.candlech <- ohlc } if prev.Time != ohlc.Time { //产生新的K线 t := time.Unix(int64(prev.Time), 0) log.Println(t, prev.Open == prev.Close, prev.High == prev.Low, prev.RealVolumn, prev.TickVolumn) reader2.candlech <- prev } prev = ohlc } // log.Println("end:", prev) }() return reader2, nil } func OpenDir(dir string, instrumentId int64) (*TickRead, error) { files, err := getfilelist(dir+"/marketdata/orderbook/"+fmt.Sprint(instrumentId), ".csv.gz") if err != nil { return nil, err } ch := make(chan *base.TickGo, 1) errch := make(chan error) reader := &TickRead{} reader.tickch = ch reader.errch = errch i := 0 go func() { for _, file := range files { err := readGzipCsv(file, instrumentId, func(ti *base.TickGo) { t := time.Unix(int64(ti.Time), 0) log.Println("@@@", t, ti.Ask, ti.Bid, ti.Askv, ti.Bidv) ch <- ti }) i++ if i == 20 { break } if err == nil || err == io.EOF { continue } ch <- nil errch <- err break } //发送结束标记 ch <- nil errch <- io.EOF }() return reader, nil } //采用广度优先算法,遍历文件 func getfilelist(dir string, ext string) ([]string, error) { queue := make([]string, 0) files := make([]string, 0) queue = append(queue, dir) for len(queue) > 0 { top := queue[0] queue = queue[1:] //readdir handle, err := os.Open(top) if err != nil { return nil, err } defer handle.Close() fis, err := handle.Readdir(0) if err != nil { return nil, err } for _, fi := range fis { path := top + "/" + fi.Name() if fi.IsDir() { queue = append(queue, path) } else if strings.HasSuffix(path, ext) { files = append(files, path) } } } return files, nil } func readGzipCsv(file string, instrumentId int64, cb func(*base.TickGo)) error { handle, err := os.Open(file) if err != nil { return err } defer handle.Close() reader, err := gzip.NewReader(handle) if err != nil { return err } defer reader.Close() csvreader := csv.NewReader(reader) log.Println(file) first := true for { data, err := csvreader.Read() if err == io.EOF { return nil } if err != nil { continue } if first { first = false continue } if len(data) > 1 && data[1] == "" { continue } //第一行 || 空行 || 不合格的数据 if data[0] == "" || len(data) != 5 { continue } //处理数据 ti, err := toTickGo(data, int(instrumentId)) if err != nil { return err } cb(ti) } return nil } func toTickGo(data []string, instrumentId int) (*base.TickGo, error) { tick := &base.TickGo{} if len(data) != 5 { return nil, errors.New("error data format.") } symbolId, err := markinfo.BookIdToSymbolId(instrumentId) if err != nil { return nil, errors.New("error instrumentId.") } timestamp, err := strconv.ParseInt(data[0], 10, 64) if err != nil { return nil, err } tick.Symbol = int16(symbolId) tick.Time = int32(timestamp / 1000) tick.Ms = int16(timestamp % 1000) bid, err := strconv.ParseFloat(data[1], 64) if err != nil { return nil, err } bidv, err := strconv.ParseFloat(data[2], 64) if err != nil { return nil, err } ask, err := strconv.ParseFloat(data[3], 64) if err != nil { return nil, err } askv, err := strconv.ParseFloat(data[4], 64) if err != nil { return nil, err } tick.Bid = float32(bid) tick.Ask = float32(ask) tick.Bidv = int32(bidv) tick.Askv = int32(askv) return tick, nil }