123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974 |
- // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved.
- package tick
- // 本文件实现lmax下载的历史行情数据的下载和解析,周期转换和保存
- import (
- "compress/gzip"
- "encoding/csv"
- "errors"
- "fmt"
- "io"
- "log"
- "net/url"
- "os"
- "path"
- "sort"
- "strconv"
- "strings"
- "sync"
- "time"
- "tickserver/api/lmaxapi"
- "tickserver/api/lmaxapi/request"
- "tickserver/api/lmaxapi/response"
- "tickserver/framework/event"
- "tickserver/server/market"
- )
- var lmaxUser = "wave2907"
- var lmaxPassWord = "Tg417396"
- var lmaxUrl = "https://trade.lmaxtrader.com"
- var lmaxInsTable = "Lmax_inss"
- var lmaxSaveDir string
- var lmaxDB *market.MyDB
- var insMap map[string]*market.Instrument
- var errcount int64
- var lmaxHisDownCh = make(chan int, 1)
- var lmaxHisParseCh = make(chan int, 1)
- var lmaxInsStartMap = make(map[string]map[int]int64)
- //var lmaxInsPublisher event.EventPublisher // 实时行情数据发布器
- var lmaxInsPublisher2 event.EventPublisher // 实时行情数据发布器
- type byTime []market.Candle
- func (a byTime) Len() int { return len(a) }
- func (a byTime) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
- func (a byTime) Less(i, j int) bool { return a[i].Timestamp < a[j].Timestamp }
- // 设计: 4个步骤: 订阅==>下载==>解析==>保存
- // 1, 单个账号轮询下载所有的instrument
- // 2, 分别轮询tick, M1和D1
- // 3, 保证订阅返回文件列表的有序
- // 4, 可同时下载几组返回的订阅列表
- // 5, 每个返回的订阅文件列表依次下载, 不允许下载失败(失败则一直循环请求下载)
- // 6, 每组列表下载完毕(全部下载完成), 去解析, 解析的文件列表保证时序和完整
- // 7, 可以同时解析多组文件(每组文件)
- // 8, 对日线和小时线数据合并==>一个产品只有一个日线文件, 一个产品每月一个小时线文件
- // 9, 历史数据==>binary==>gzip编码压缩保存
- // 10, 把每个文件的信息写入数据库(refer, 路径, 岂止时间和K线数量)
- func doErrCode(s *lmaxapi.Session, err error) {
- //1. 处理下面的事件
- //1.1 网络中断:如果是交易,可能需要重新登录,以获取最新订单状态,如果是行情,可能不需要重启
- //1.2 session 过期,发生403错误,必须重新登录
- //1.3 heartbeat 心跳无响应: Op=stream err=heart beart timeout, Code=-1
- // 这个时候调用 session.StreamClose() 重新启动stream
- operr, ok := err.(*lmaxapi.OpError)
- log.Println("operr:", operr)
- if !ok {
- return
- }
- //1.2
- if operr.Code == 403 {
- log.Println("stop session")
- s.Stop()
- return
- }
- //1.1 and 1.3
- //stream 中发生错误,重启stream, 如果是交易,可能要选择重启session
- if operr.Op == "Stream" {
- log.Println("stop stream")
- if operr.Code == 0 {
- time.Sleep(1 * time.Second)
- }
- s.StopStream()
- // ss.Stop()
- return
- }
- }
- func regStreamError(ss *lmaxapi.Session) {
- // 注册数据流失败处理函数
- ss.RegisterStreamFailureEvent(func(s *lmaxapi.Session, err error) {
- doErrCode(s, err)
- })
- }
- func onInstrument(value *response.Instrument) {
- ins := &market.Instrument{
- Id: market.LmaxPrefix + strconv.Itoa(int(value.Id)),
- Name: value.Name,
- ExId: market.Lmax,
- Typ: market.Forex,
- PriceInc: value.PriceIncrement,
- Margin: value.MarginRate,
- StartTime: value.StartTime.Unix() * 1000, // ms
- }
- insMap[ins.Id] = ins
- // log.Println(ins.Id, ins.Name, ins.ExId)
- //lmaxInsPublisher.Publish(ins)
- // lds.insPublisher2.Publish(ins)
- lmaxDB.InsertIns(ins, lmaxInsTable)
- }
- func login(name, password, typ, url string) (*lmaxapi.Session, error) {
- log.Println("LmaxDS.login:", name)
- lmaxapi.SetBaseUrl(url)
- req := request.NewLoginRequest(name, password, typ)
- ss, err := lmaxapi.Login(req)
- if err != nil {
- s := err.Error()
- if len(s) > 32 {
- s = s[:32]
- }
- log.Println("lmaxapi.Login error:", s)
- return nil, err
- }
- log.Println(name, "login OK!")
- return ss, nil
- }
- func DownloadLmax(dir string, db *market.MyDB) {
- var chkMu sync.Mutex
- chkSS := false
- insMap = make(map[string]*market.Instrument)
- lmaxDB = db
- lmaxSaveDir = dir
- //此 goroutine用来触发下载历史数据, 每分钟下载一个
- go func() {
- //insMap := make(map[string]*market.Instrument)
- //lmaxInsPublisher.Event().Attach(func(v interface{}) error {
- //ins := v.(*market.Instrument)
- //insMap[ins.Id] = ins
- //return nil
- //})
- for {
- <-lmaxHisDownCh
- <-lmaxHisParseCh
- for _, ins := range insMap {
- //if ins.Id != "lmax_4001" {
- //continue
- //}
- //log.Println("ins.Id", ins.Id)
- chkMu.Lock()
- if !chkSS {
- time.Sleep(time.Minute * 10) // 防止不断重复发送下载请求
- }
- chkMu.Unlock()
- //if ins.Id == "lmax_4001" {
- lmaxInsPublisher2.Publish(ins)
- //}
- time.Sleep(time.Second * 10)
- }
- time.Sleep(time.Second * 10)
- }
- }()
- fn := func() {
- for {
- chkMu.Lock()
- chkSS = false
- chkMu.Unlock()
- // 登录产生新的Session
- ss, err := login(lmaxUser, lmaxPassWord, request.ProductType.CFD_LIVE, lmaxUrl)
- if err != nil {
- time.Sleep(time.Second * 10)
- continue
- }
- ss.RegisterSessionDisconnected(func(s *lmaxapi.Session) {
- log.Println("session disconnected. STOP session!")
- s.Stop()
- })
- regStreamError(ss)
- downloadRun(ss) // 历史数据
- ss.LoadAllInstruments(func(value *response.Instrument) {
- onInstrument(value)
- //ss.Subscribe(request.NewOrderBookSubscriptionRequest(value.Id), DefaultSubscribeCB)
- })
- ss.Wait()
- // 设置心跳
- ss.HeartbeatTimeout(5 * time.Second)
- // 会话启动
- chkMu.Lock()
- chkSS = true
- chkMu.Unlock()
- ss.Start()
- // 有错误时跳出Start重启
- log.Println("lmax session RESTART !!!")
- }
- }
- fn()
- }
- func ifDuplicateDownload(insId string, period int, start int64) bool {
- stmap, ok := lmaxInsStartMap[insId]
- if ok {
- st, ok1 := stmap[period]
- if ok1 {
- if st == start {
- return true
- } else {
- lmaxInsStartMap[insId][period] = start
- return false
- }
- } else {
- lmaxInsStartMap[insId][period] = start
- return false
- }
- } else {
- lmaxInsStartMap[insId] = make(map[int]int64)
- lmaxInsStartMap[insId][period] = start
- return false
- }
- }
- func downloadRun(ss *lmaxapi.Session) {
- dlParser := newParser(1) // 解析器
- dlTask := newDownTask(lmaxDB, dlParser, lmaxSaveDir, "", 1) // 下载任务器
- ss.RegisterHistoricMarketDataEvent(func(s *lmaxapi.Session, event *response.HistoricMarketDataEvent) {
- if len(event.Urls) == 0 {
- log.Println("event.Urls:", len(event.Urls))
- return
- }
- //for i, v := range event.Urls {
- //log.Println("url:", i, v)
- //}
- dlTask.downloadUrls(ss, event.Urls)
- })
- lmaxInsPublisher2.Event().Attach(func(v interface{}) error {
- ins := v.(*market.Instrument)
- //insId := market.LmaxPrefix + strconv.Itoa(int(4001))
- //if ins.Id != insId {
- //return nil
- //}
- lmaxId := market.RealInsId(ins.Id)
- ps := []int{0, market.M1, market.D1}
- for _, period := range ps {
- downloadInsHis(ss, ins.Id, lmaxId, period)
- }
- return nil
- })
- }
- func downloadInsHis(ss *lmaxapi.Session, insId, lmaxId string, period int) {
- // log.Println("downloadInsHis", insId, lmaxId, period)
- switch period {
- case 0:
- downloadInsTicks(ss, insId, lmaxId)
- default:
- downloadInsCandles(ss, insId, lmaxId, period)
- }
- }
- // downloadInsTicks 下载tick数据
- // update == true 指示是否下载最新数据
- func downloadInsTicks(ss *lmaxapi.Session, insId, lmaxId string) {
- st := insMap[insId].StartTime / 1000
- //if market.Debug {
- //st = time.Date(2015, 1, 1, 0, 0, 0, 0, time.Local).Unix()
- //}
- //r, err := lds.db.GetLastTime(insId, 0)
- //if err == nil && r != nil {
- //log.Println("downloadInsTicks", insId, err, st, r.StartTime)
- //st = r.StartTime / 1000
- //}
- startTime, err := lmaxDB.GetInsStartTime(lmaxInsTable, insId, 0)
- startTime /= 1000
- log.Println("downloadInsTicks", insId, err, st, startTime)
- if err == nil && startTime > st {
- st = startTime
- }
- //if time.Now().Unix()-st < int64(market.D1) { //market.M1
- //return
- //}
- if ifDuplicateDownload(insId, 0, st) {
- return
- }
- log.Println("downloadInsTicks:", insId, lmaxId, st)
- iid, _ := strconv.Atoi(lmaxId)
- req := request.NewHistoricSubscriptionRequest()
- ss.Subscribe(req, func(err error) {
- if err != nil {
- // log.Println("downloadInsTicks Subscribe history request error:", err)
- doErrCode(ss, err) // 错误处理
- return
- }
- hreq := request.NewTopOfBookHistoricRequest(lmaxId, int64(iid), time.Unix(st, 0), time.Now(), "CSV")
- log.Println(lmaxId, int64(iid), time.Unix(st, 0), time.Now(), "CSV")
- ss.RequestHistoricMarketData(hreq, func(err error) {
- if err != nil {
- log.Println("downloadInsTicks RequestHistoricMarketData error", err)
- }
- })
- return
- })
- }
- // downloadInsCandles 下载历史K线数据
- // update == true 指示是否下载最新数据
- func downloadInsCandles(ss *lmaxapi.Session, insId, lmaxId string, period int) {
- ps := "MINUTE"
- if period == market.D1 {
- ps = "DAY"
- }
- st := insMap[insId].StartTime / 1000
- //if market.Debug {
- //st = time.Date(2015, 1, 1, 0, 0, 0, 0, time.Local).Unix()
- //}
- //r, err := lds.db.GetLastTime(insId, period)
- //if err == nil && r != nil {
- //log.Println("downloadInsCandles", insId, err, st, r.StartTime, r.EndTime, period)
- //st = r.EndTime / 1000
- //if period == market.M1 {
- //st = r.StartTime / 1000
- //}
- //}
- startTime, err := lmaxDB.GetInsStartTime(lmaxInsTable, insId, period)
- startTime /= 1000
- log.Println("downloadInsCandles", insId, err, st, startTime, period)
- if err == nil && startTime > st {
- st = startTime
- }
- //if time.Now().Unix()-st < market.D1 { //int64(period)
- //return
- //}
- if ifDuplicateDownload(insId, period, st) {
- return
- }
- log.Println("lds.downloadInsCandles", insId, lmaxId, ps, st)
- iid, _ := strconv.Atoi(lmaxId)
- req := request.NewHistoricSubscriptionRequest()
- ss.Subscribe(req, func(err error) {
- if err != nil {
- // log.Println("downloadInsCandles Subscribe error:", err)
- doErrCode(ss, err) // 错误处理
- return
- }
- hreq := request.NewAggregateHistoricRequest(lmaxId, int64(iid), time.Unix(st, 0), time.Now(), "CSV",
- ps, []string{"BID"}) // 使用卖价
- log.Println(lmaxId, int64(iid), time.Unix(st, 0), time.Now(), "CSV", ps, []string{"BID"})
- ss.RequestHistoricMarketData(hreq, func(err error) {
- if err != nil {
- log.Println("downloadInsCandles RequestHistoricMarketData error:", err)
- }
- })
- })
- }
- type dlWork struct {
- url string
- ss *lmaxapi.Session
- }
- // 下载任务
- type downTask struct {
- in chan []*dlWork
- db *market.MyDB
- p *parser
- dirMu sync.Mutex
- dir string
- dir1 string
- mu sync.Mutex
- wss [][]*dlWork
- }
- func (task *downTask) addws(ws []*dlWork) bool {
- task.mu.Lock()
- defer task.mu.Unlock()
- for _, s := range task.wss {
- for _, w := range s {
- if w.url == ws[0].url {
- _, _, _, err := task.parseUrl(w.url)
- if err != nil {
- return false
- }
- //r, err := task.db.GetLastTime(insId, period)
- //if err != nil {
- //return false
- //}
- //if r != nil && fname == r.Refer {
- //// log.Println("download newest url:", w.url)
- //return true
- //}
- return false
- }
- }
- }
- task.wss = append(task.wss, ws)
- return true
- }
- // downloadUrls 下载一种instrument的url列表
- // url列表是以时间顺序的, 下载完毕后, 要保证顺序给到解析器解析
- func (task *downTask) downloadUrls(ss *lmaxapi.Session, urls []string) {
- ws := make([]*dlWork, len(urls))
- for i := 0; i < len(urls); i++ {
- w := &dlWork{urls[i], ss}
- ws[i] = w
- }
- //if task.addws(ws) {
- task.in <- ws
- //}
- }
- func newDownTask(db *market.MyDB, p *parser, dir, dir1 string, max int) *downTask {
- task := &downTask{
- in: make(chan []*dlWork, 1024),
- db: db,
- p: p,
- dir: dir,
- dir1: dir1,
- }
- // 并发下载
- for i := 0; i < max; i++ {
- go task.dowork()
- }
- return task
- }
- func (task *downTask) dowork() {
- for {
- var ws []*dlWork
- select {
- case ws = <-task.in:
- default:
- select {
- case lmaxHisDownCh <- 1:
- default:
- }
- time.Sleep(time.Second * 1)
- continue
- }
- //ws := <-task.in
- //pws := []*parseWork{}
- wCount := len(ws)
- //pwCh := make(chan *parseWork, wCount)
- //errCh := make(chan error, wCount)
- resultCh := make(chan int, wCount)
- wsCh := make(chan *dlWork, 1)
- log.Println("download begin")
- go func() {
- for _, w := range ws {
- wsCh <- w
- }
- }()
- for index := 0; index < 4; index++ {
- go func() {
- for {
- w := <-wsCh
- i := 0
- for {
- log.Println("down:", w.url)
- pw, err := task.downloadOne(w.ss, w.url)
- if err != nil {
- log.Println("down error:", err, w.url)
- i++
- if i == 3 {
- resultCh <- 0
- //errCh <- err
- break
- }
- continue
- }
- var print_url, print_fname string
- if w != nil {
- print_url = w.url
- }
- if pw != nil {
- print_fname = pw.fname
- }
- log.Println("down ok:", print_url, print_fname)
- //pwCh <- pw
- if pw != nil {
- pws := []*parseWork{}
- pws = append(pws, pw)
- task.p.parse(pws)
- pws = nil
- }
- resultCh <- 1
- break
- }
- }
- }()
- }
- //bErr := false
- //var pw *parseWork
- //var err error
- for count := 0; count < len(ws); count++ {
- <-resultCh
- //select {
- //case pw = <-pwCh:
- //if pw != nil {
- //pws = append(pws, pw)
- //}
- //case err = <-errCh:
- //bErr = true
- //}
- }
- //close(wsCh)
- log.Println("download end")
- //if bErr {
- //log.Println("downloadOne error:", err, len(pws), len(ws))
- ////return
- //}
- // 解析一组文件
- //task.p.parse(pws)
- }
- }
- /*func (task *downTask) dowork() {
- for {
- ws := <-task.in
- pws := []*parseWork{}
- for _, w := range ws {
- i := 0
- for {
- pw, err := task.downloadOne(w.ss, w.url)
- if err != nil {
- i++
- if i == 3 {
- log.Printf("downloadOne error:", err)
- return
- }
- continue
- }
- if pw != nil {
- pws = append(pws, pw)
- }
- break
- }
- }
- // 解析一组文件
- task.p.parse(pws)
- }
- }*/
- func (task *downTask) parseUrl(u string) (fname, insId string, period int, e error) {
- surl, err := url.Parse(u)
- if err != nil {
- e = err
- return
- }
- bname := path.Base(surl.Path)
- ss := strings.Split(bname, "-")
- if len(ss) < 8 {
- e = errors.New("Url is error:")
- return
- }
- // 根据url文件名解析出insId和period
- insId = market.LmaxPrefix + ss[7]
- if strings.Contains(bname, "tick") {
- period = 0
- } else if strings.Contains(bname, "minute") {
- period = market.M1
- } else if strings.Contains(bname, "day") {
- period = market.D1
- } else {
- log.Println("task.downloadOne error: url is error:", u)
- }
- fname = task.dir + surl.Path
- return
- }
- func (task *downTask) downloadOne(session *lmaxapi.Session, u string) (*parseWork, error) {
- fname, insId, period, err := task.parseUrl(u)
- if err != nil {
- return nil, err
- }
- if _, err := os.Stat(fname); !os.IsNotExist(err) {
- return &parseWork{fname, insId, period}, nil //return nil, nil
- // 文件存在
- //r, err := task.db.GetLastTime(insId, period)
- //if err != nil {
- //return &parseWork{fname, insId, period}, nil
- //}
- //if r != nil && fname != r.Refer {
- //// 并不是最新的文件, 那么无需下载
- //return nil, nil
- //}
- }
- resp, err := session.OpenUrl(u, nil)
- if err != nil {
- return nil, err
- }
- defer resp.Body.Close()
- // log.Println(u)
- // 保存到本地
- fpath := path.Dir(fname)
- os.MkdirAll(fpath, 0777)
- w, err := os.Create(fname)
- if err != nil {
- return nil, err
- }
- n, err := io.Copy(w, resp.Body)
- w.Close()
- if err != nil {
- if err != io.EOF {
- return nil, err
- }
- }
- if n == 0 {
- return nil, errors.New("io.Copy write 0 byte: " + u)
- }
- return &parseWork{fname, insId, period}, nil
- }
- type parseWork struct {
- fname string
- insId string
- period int
- }
- // 解析器
- type parser struct {
- pwch chan []*parseWork // 缓存解析任务
- mu sync.Mutex
- pwss [][]*parseWork
- }
- func newParser(max int) *parser {
- fch := make(chan []*parseWork, 32)
- p := &parser{pwch: fch}
- // 指定数量的并发解析
- for i := 0; i < max; i++ {
- go p.doParse()
- }
- return p
- }
- // 避免重复解析
- func (p *parser) addpws(pws []*parseWork) bool {
- p.mu.Lock()
- defer p.mu.Unlock()
- for _, ws := range p.pwss {
- for _, pw := range ws {
- if pws[0].fname == pw.fname {
- //r, err := lmaxDB.GetLastTime(pw.insId, pw.period)
- //if err != nil {
- //return false
- //}
- //if r != nil && pw.fname == r.Refer {
- //return true
- //}
- return false
- }
- }
- }
- p.pwss = append(p.pwss, pws)
- return true
- }
- func (p *parser) parse(pws []*parseWork) {
- if len(pws) == 0 {
- return
- }
- //if p.addpws(pws) {
- p.pwch <- pws
- //}
- }
- func (p *parser) doParse() {
- for {
- var pws []*parseWork
- select {
- case pws = <-p.pwch:
- default:
- select {
- case lmaxHisParseCh <- 1:
- default:
- }
- time.Sleep(time.Second * 1)
- continue
- }
- //pws := <-p.pwch
- for _, w := range pws {
- log.Println("parse:", w.fname)
- ticks, candles, err := parseFile(w.fname, w.period)
- if err != nil {
- log.Println("lmax parseFile error:", err, w.fname)
- badname := w.fname + ".bad"
- os.Remove(badname)
- os.Rename(w.fname, badname)
- continue
- }
- log.Println("save:", w.fname)
- err = p.doSave(w.fname, w.insId, w.period, ticks, candles)
- if err != nil {
- log.Println("lmax doSave error:", err, w.fname)
- }
- }
- }
- }
- // parseFile 解析文件, refer表明文件来源
- func parseFile(refer string, period int) ([]market.Tick, []market.Candle, error) {
- // 解析为[]Tick或者[]Candle
- f, err := os.Open(refer)
- if err != nil {
- return nil, nil, errors.New("parseFile error: " + err.Error())
- }
- defer f.Close()
- return parse(f, period)
- }
- func ParseFile(refer string, period int) ([]market.Tick, []market.Candle, error) {
- return parseFile(refer, period)
- }
- func doConv(candles []market.Candle, insId string, period int) ([]market.Candle, error) {
- r := market.NewCandleBuf(candles)
- buf, err := market.ConvPeriod(r, insId, period)
- if err != nil {
- log.Println(err)
- }
- if len(buf) == 0 {
- return nil, errors.New("doConv error: result buf len is ZERO")
- }
- return buf, nil
- }
- func (p *parser) doSave(refer, insId string, period int, ticks []market.Tick, candles []market.Candle) error {
- // 保存到本地
- //for i := 0; i < len(ticks); i++ {
- //log.Println(ticks[i].Timestamp)
- //}
- if period == 0 { // tick
- if len(ticks) == 0 {
- return io.ErrUnexpectedEOF
- }
- fname := strings.Replace(refer, "csv.gz", "TK", -1)
- //err := market.SaveTicks(lmaxDB, refer, fname, ticks, insId)
- _, err := market.SaveTickEx(lmaxSaveDir, ticks, insId, true)
- if err != nil {
- return err
- }
- lmaxDB.UpdateInsStartTime(lmaxInsTable, insId, ticks[len(ticks)-1].Timestamp, 0)
- log.Println("doSave", insId, ticks[0].Timestamp, 0, fname)
- } else { // candle
- pname := market.PeriodNameMap[period]
- fname := strings.Replace(refer, "csv.gz", pname, -1)
- if len(candles) == 0 {
- return io.ErrUnexpectedEOF
- }
- if period == market.D1 {
- bname := path.Base(refer)
- ss := strings.Split(bname, "-")
- if len(ss) < 8 {
- log.Println(bname)
- return errors.New("saveD1 error: " + fname)
- }
- dir := path.Dir(refer)
- for ; path.Base(dir) != ss[7]; dir = path.Dir(dir) {
- }
- fname = path.Join(dir, "candle.D1")
- dir = path.Join(lmaxSaveDir, insId)
- os.MkdirAll(dir, 0777)
- bname = fmt.Sprintf("%s.gz", market.PeriodNameMap[period])
- fname = path.Join(dir, bname)
- candles, _ = combinEx(fname, candles)
- //err := market.SaveH1OrD1(lmaxDB, refer, fname, candles, insId, market.D1)
- _, err := market.SaveCandlesEx(lmaxSaveDir, insId, candles, market.D1, true)
- if err != nil {
- return err
- }
- lmaxDB.UpdateInsStartTime(lmaxInsTable, insId, candles[len(candles)-1].Timestamp, period)
- log.Println("doSave", insId, candles[0].Timestamp, period)
- return nil
- }
- //err := market.SaveCandles(lmaxDB, refer, fname, candles, insId, period)
- _, err := market.SaveCandlesEx(lmaxSaveDir, insId, candles, period, true)
- if err != nil {
- return err
- }
- lmaxDB.UpdateInsStartTime(lmaxInsTable, insId, candles[len(candles)-1].Timestamp, period)
- log.Println("doSave", insId, candles[0].Timestamp, period)
- // 转换周期
- if period == market.M1 {
- buf, err := doConv(candles, insId, market.M5)
- if err != nil {
- return err
- }
- //refer := fname
- fname = strings.Replace(fname, "M1", "M5", -1)
- //err = market.SaveCandles(lmaxDB, refer, fname, buf, insId, market.M5)
- _, err = market.SaveCandlesEx(lmaxSaveDir, insId, buf, market.M5, true)
- if err != nil {
- return err
- }
- // log.Println("H1:", fname)
- buf, err = doConv(buf, insId, market.H1)
- if err != nil {
- return err
- }
- fname = path.Join(path.Dir(fname), "candle.H1")
- //return market.SaveH1OrD1(lmaxDB, refer, fname, buf, insId, market.H1)
- _, err = market.SaveCandlesEx(lmaxSaveDir, insId, buf, market.H1, true)
- return err
- }
- }
- return nil
- }
- // parse 从r读取所有的数据, 解析为[]tick或者[]Candle
- func parse(r io.Reader, period int) ([]market.Tick, []market.Candle, error) {
- zipr, err := gzip.NewReader(r)
- if err != nil {
- return nil, nil, err
- }
- candles := []market.Candle{}
- ticks := []market.Tick{}
- first := true
- csvr := csv.NewReader(zipr)
- for {
- data, err := csvr.Read()
- if err != nil {
- if err != io.EOF {
- return nil, nil, err
- }
- break
- }
- if first {
- first = false
- continue
- }
- if len(data) > 0 && data[0] == "" {
- continue
- }
- if len(data) > 1 && data[1] == "" {
- continue
- }
- if period == 0 {
- tick, e := parseTick(data)
- if e != nil {
- log.Println(e, data)
- continue
- }
- ticks = append(ticks, *tick)
- } else {
- candle, e := parseCandle(data)
- if e != nil {
- log.Println(e, data)
- continue
- }
- candles = append(candles, *candle)
- }
- }
- return ticks, candles, nil
- }
- // parseTick解析一行csv数据为一个tick
- func parseTick(data []string) (*market.Tick, error) {
- tick := market.Tick{}
- if len(data) != 5 {
- return nil, errors.New("error data format.")
- }
- timestamp, err := strconv.ParseInt(data[0], 10, 64)
- if err != nil {
- return nil, err
- }
- tick.Timestamp = timestamp
- var ff [4]float64
- for i := 1; i < 5; i++ {
- f, err := strconv.ParseFloat(data[i], 64)
- if err != nil {
- return nil, err
- }
- ff[i-1] = f
- }
- tick.Price = ff[0] // bid price
- tick.Volume = ff[1] // bid qty
- tick.Bid[0] = ff[0]
- tick.Bid[1] = ff[1]
- tick.Ask[0] = ff[2]
- tick.Ask[1] = ff[3]
- return &tick, nil
- }
- // parseTick解析一行csv数据为一个candle
- func parseCandle(data []string) (*market.Candle, error) {
- if len(data) < 11 {
- return nil, errors.New("the csv field not match")
- }
- t, err := strconv.ParseInt(data[0], 10, 64)
- if err != nil {
- return nil, err
- }
- var ff [10]float64
- for i := 0; i < 10; i++ {
- ff[i], err = strconv.ParseFloat(data[i+1], 64)
- if err != nil {
- return nil, err
- }
- }
- return &market.Candle{
- Timestamp: t,
- Open: ff[0],
- High: ff[1],
- Low: ff[2],
- Close: ff[3],
- RealVolums: ff[4] + ff[5] + ff[6],
- TickVolums: ff[7] + ff[8] + ff[9],
- }, nil
- }
- func combinEx(filename string, candles []market.Candle) ([]market.Candle, error) {
- buf, err := market.ReadCandleFile(filename)
- if err != nil {
- return candles, err
- }
- candles = append(buf, candles[:]...)
- sort.Sort(byTime(candles))
- return candles, nil
- }
- // 周期为D1的只保存一个文件
- // 周期为H1的每个月保存一个文件
- // 其他周期(<H1),每天保存一个文件
- // func saveH1OrD1(db *market.MyDB, refer, filename string, candles []market.Candle, insId string, period int) error {
- // buf, _ := combin(filename, candles)
- // return market.SaveCandles(db, refer, filename, buf, insId, period)
- // }
|