123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311 |
- package main
- //读取文件,生成 base.TickGo
- //Open(dir, instrumentId)
- //Read() (*base.TickGo, error)
- import "fmt"
- import "io"
- import "os"
- import "lmaxapi/base"
- import "log"
- import "strings"
- import "compress/gzip"
- import "encoding/csv"
- import "strconv"
- import "errors"
- import "lmaxapi/markinfo"
- import "flag"
- import "runtime/pprof"
- //import "runtime"
- import "time"
- type TickRead struct {
- tickch chan *base.TickGo
- errch chan error
- }
- func (tr *TickRead) Read() (*base.TickGo, error) {
- tick := <-tr.tickch
- if tick == nil {
- return nil, <-tr.errch
- }
- return tick, nil
- }
- type CandleRead struct {
- candlech chan *base.OhlcGo
- errch chan error
- }
- func (tr *CandleRead) Read() (*base.OhlcGo, error) {
- candle := <-tr.candlech
- if candle == nil {
- return nil, <-tr.errch
- }
- return candle, nil
- }
- var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
- func main() {
- //runtime.GOMAXPROCS(4)
- s := time.Now()
- defer func() {
- log.Println(time.Now().Sub(s))
- }()
- flag.Parse()
- if *cpuprofile != "" {
- f, err := os.Create(*cpuprofile)
- if err != nil {
- log.Fatal(err)
- }
- pprof.StartCPUProfile(f)
- defer func() {
- log.Println("stop pprof.")
- pprof.StopCPUProfile()
- }()
- }
- reader, err := m1Reader()
- if err != nil {
- log.Println(err)
- return
- }
- symbolId, err := markinfo.BookIdToSymbolId(4001)
- if err != nil {
- log.Println(err)
- return
- }
- //货币对
- //周期
- //初始K线(已经生成了部分k线, 在这个基础上继续生成)
- //时区,默认是从 GMT+0 到 GMT+0
- //数据源,支持两种接口:base.TickReader base.CandleReader
- gen, err := base.NewCandleGenerate(symbolId, base.H1, nil, nil, reader)
- if err != nil {
- log.Println(err)
- return
- }
- //一直读取
- var prev *base.OhlcGo
- for {
- //这里会输出所有的k线生成的过程,每次tick跳动,就会生成一个ohlc
- ohlc, err := gen.Read()
- if err == io.EOF {
- break
- }
- if err != nil {
- log.Println(err)
- break
- }
- if prev == nil {
- prev = ohlc
- }
- if prev.Time != ohlc.Time { //产生新的K线
- // log.Println(prev)
- }
- prev = ohlc
- }
- log.Println("end")
- }
- func m1Reader() (*CandleRead, error) {
- reader, err := OpenDir("./lmax", 4001)
- if err != nil {
- return nil, err
- }
- symbolId, err := markinfo.BookIdToSymbolId(4001)
- if err != nil {
- return nil, err
- }
- //货币对
- //周期
- //初始K线(已经生成了部分k线, 在这个基础上继续生成)
- //时区,默认是从 GMT+0 到 GMT+0
- //数据源,支持两种接口:base.TickReader base.CandleReader
- gen, err := base.NewCandleGenerate(symbolId, base.S15, nil, nil, reader)
- if err != nil {
- return nil, err
- }
- ch := make(chan *base.OhlcGo, 1024)
- errch := make(chan error)
- reader2 := &CandleRead{}
- reader2.candlech = ch
- reader2.errch = errch
- //一直读取
- go func() {
- var prev *base.OhlcGo
- for {
- //这里会输出所有的k线生成的过程,每次tick跳动,就会生成一个ohlc
- ohlc, err := gen.Read()
- if err == io.EOF {
- break
- }
- if err != nil {
- log.Println(err)
- break
- }
- if prev == nil {
- prev = ohlc
- reader2.candlech <- ohlc
- }
- if prev.Time != ohlc.Time { //产生新的K线
- t := time.Unix(int64(prev.Time), 0)
- log.Println(t, prev.Open == prev.Close, prev.High == prev.Low, prev.RealVolumn, prev.TickVolumn)
- reader2.candlech <- prev
- }
- prev = ohlc
- }
- // log.Println("end:", prev)
- }()
- return reader2, nil
- }
- func OpenDir(dir string, instrumentId int64) (*TickRead, error) {
- files, err := getfilelist(dir+"/marketdata/orderbook/"+fmt.Sprint(instrumentId), ".csv.gz")
- if err != nil {
- return nil, err
- }
- ch := make(chan *base.TickGo, 1)
- errch := make(chan error)
- reader := &TickRead{}
- reader.tickch = ch
- reader.errch = errch
- i := 0
- go func() {
- for _, file := range files {
- err := readGzipCsv(file, instrumentId, func(ti *base.TickGo) {
- t := time.Unix(int64(ti.Time), 0)
- log.Println("@@@", t, ti.Ask, ti.Bid, ti.Askv, ti.Bidv)
- ch <- ti
- })
- i++
- if i == 20 {
- break
- }
- if err == nil || err == io.EOF {
- continue
- }
- ch <- nil
- errch <- err
- break
- }
- //发送结束标记
- ch <- nil
- errch <- io.EOF
- }()
- return reader, nil
- }
- //采用广度优先算法,遍历文件
- func getfilelist(dir string, ext string) ([]string, error) {
- queue := make([]string, 0)
- files := make([]string, 0)
- queue = append(queue, dir)
- for len(queue) > 0 {
- top := queue[0]
- queue = queue[1:]
- //readdir
- handle, err := os.Open(top)
- if err != nil {
- return nil, err
- }
- defer handle.Close()
- fis, err := handle.Readdir(0)
- if err != nil {
- return nil, err
- }
- for _, fi := range fis {
- path := top + "/" + fi.Name()
- if fi.IsDir() {
- queue = append(queue, path)
- } else if strings.HasSuffix(path, ext) {
- files = append(files, path)
- }
- }
- }
- return files, nil
- }
- func readGzipCsv(file string, instrumentId int64, cb func(*base.TickGo)) error {
- handle, err := os.Open(file)
- if err != nil {
- return err
- }
- defer handle.Close()
- reader, err := gzip.NewReader(handle)
- if err != nil {
- return err
- }
- defer reader.Close()
- csvreader := csv.NewReader(reader)
- log.Println(file)
- first := true
- for {
- data, err := csvreader.Read()
- if err == io.EOF {
- return nil
- }
- if err != nil {
- continue
- }
- if first {
- first = false
- continue
- }
- if len(data) > 1 && data[1] == "" {
- continue
- }
- //第一行 || 空行 || 不合格的数据
- if data[0] == "" || len(data) != 5 {
- continue
- }
- //处理数据
- ti, err := toTickGo(data, int(instrumentId))
- if err != nil {
- return err
- }
- cb(ti)
- }
- return nil
- }
- func toTickGo(data []string, instrumentId int) (*base.TickGo, error) {
- tick := &base.TickGo{}
- if len(data) != 5 {
- return nil, errors.New("error data format.")
- }
- symbolId, err := markinfo.BookIdToSymbolId(instrumentId)
- if err != nil {
- return nil, errors.New("error instrumentId.")
- }
- timestamp, err := strconv.ParseInt(data[0], 10, 64)
- if err != nil {
- return nil, err
- }
- tick.Symbol = int16(symbolId)
- tick.Time = int32(timestamp / 1000)
- tick.Ms = int16(timestamp % 1000)
- bid, err := strconv.ParseFloat(data[1], 64)
- if err != nil {
- return nil, err
- }
- bidv, err := strconv.ParseFloat(data[2], 64)
- if err != nil {
- return nil, err
- }
- ask, err := strconv.ParseFloat(data[3], 64)
- if err != nil {
- return nil, err
- }
- askv, err := strconv.ParseFloat(data[4], 64)
- if err != nil {
- return nil, err
- }
- tick.Bid = float32(bid)
- tick.Ask = float32(ask)
- tick.Bidv = int32(bidv)
- tick.Askv = int32(askv)
- return tick, nil
- }
|