// Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved. package tick // 本文件实现lmax下载的历史行情数据的下载和解析,周期转换和保存 import ( "compress/gzip" "encoding/csv" "errors" "fmt" "io" "log" "net/url" "os" "path" "sort" "strconv" "strings" "sync" "time" "tickserver/api/lmaxapi" "tickserver/api/lmaxapi/request" "tickserver/api/lmaxapi/response" "tickserver/framework/event" "tickserver/server/market" ) var lmaxUser = "wave2907" var lmaxPassWord = "Tg417396" var lmaxUrl = "https://trade.lmaxtrader.com" var lmaxInsTable = "Lmax_inss" var lmaxSaveDir string var lmaxDB *market.MyDB var insMap map[string]*market.Instrument var errcount int64 var lmaxHisDownCh = make(chan int, 1) var lmaxHisParseCh = make(chan int, 1) var lmaxInsStartMap = make(map[string]map[int]int64) //var lmaxInsPublisher event.EventPublisher // 实时行情数据发布器 var lmaxInsPublisher2 event.EventPublisher // 实时行情数据发布器 type byTime []market.Candle func (a byTime) Len() int { return len(a) } func (a byTime) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a byTime) Less(i, j int) bool { return a[i].Timestamp < a[j].Timestamp } // 设计: 4个步骤: 订阅==>下载==>解析==>保存 // 1, 单个账号轮询下载所有的instrument // 2, 分别轮询tick, M1和D1 // 3, 保证订阅返回文件列表的有序 // 4, 可同时下载几组返回的订阅列表 // 5, 每个返回的订阅文件列表依次下载, 不允许下载失败(失败则一直循环请求下载) // 6, 每组列表下载完毕(全部下载完成), 去解析, 解析的文件列表保证时序和完整 // 7, 可以同时解析多组文件(每组文件) // 8, 对日线和小时线数据合并==>一个产品只有一个日线文件, 一个产品每月一个小时线文件 // 9, 历史数据==>binary==>gzip编码压缩保存 // 10, 把每个文件的信息写入数据库(refer, 路径, 岂止时间和K线数量) func doErrCode(s *lmaxapi.Session, err error) { //1. 处理下面的事件 //1.1 网络中断:如果是交易,可能需要重新登录,以获取最新订单状态,如果是行情,可能不需要重启 //1.2 session 过期,发生403错误,必须重新登录 //1.3 heartbeat 心跳无响应: Op=stream err=heart beart timeout, Code=-1 // 这个时候调用 session.StreamClose() 重新启动stream operr, ok := err.(*lmaxapi.OpError) log.Println("operr:", operr) if !ok { return } //1.2 if operr.Code == 403 { log.Println("stop session") s.Stop() return } //1.1 and 1.3 //stream 中发生错误,重启stream, 如果是交易,可能要选择重启session if operr.Op == "Stream" { log.Println("stop stream") if operr.Code == 0 { time.Sleep(1 * time.Second) } s.StopStream() // ss.Stop() return } } func regStreamError(ss *lmaxapi.Session) { // 注册数据流失败处理函数 ss.RegisterStreamFailureEvent(func(s *lmaxapi.Session, err error) { doErrCode(s, err) }) } func onInstrument(value *response.Instrument) { ins := &market.Instrument{ Id: market.LmaxPrefix + strconv.Itoa(int(value.Id)), Name: value.Name, ExId: market.Lmax, Typ: market.Forex, PriceInc: value.PriceIncrement, Margin: value.MarginRate, StartTime: value.StartTime.Unix() * 1000, // ms } insMap[ins.Id] = ins // log.Println(ins.Id, ins.Name, ins.ExId) //lmaxInsPublisher.Publish(ins) // lds.insPublisher2.Publish(ins) lmaxDB.InsertIns(ins, lmaxInsTable) } func login(name, password, typ, url string) (*lmaxapi.Session, error) { log.Println("LmaxDS.login:", name) lmaxapi.SetBaseUrl(url) req := request.NewLoginRequest(name, password, typ) ss, err := lmaxapi.Login(req) if err != nil { s := err.Error() if len(s) > 32 { s = s[:32] } log.Println("lmaxapi.Login error:", s) return nil, err } log.Println(name, "login OK!") return ss, nil } func DownloadLmax(dir string, db *market.MyDB) { var chkMu sync.Mutex chkSS := false insMap = make(map[string]*market.Instrument) lmaxDB = db lmaxSaveDir = dir //此 goroutine用来触发下载历史数据, 每分钟下载一个 go func() { //insMap := make(map[string]*market.Instrument) //lmaxInsPublisher.Event().Attach(func(v interface{}) error { //ins := v.(*market.Instrument) //insMap[ins.Id] = ins //return nil //}) for { <-lmaxHisDownCh <-lmaxHisParseCh for _, ins := range insMap { //if ins.Id != "lmax_4001" { //continue //} //log.Println("ins.Id", ins.Id) chkMu.Lock() if !chkSS { time.Sleep(time.Minute * 10) // 防止不断重复发送下载请求 } chkMu.Unlock() //if ins.Id == "lmax_4001" { lmaxInsPublisher2.Publish(ins) //} time.Sleep(time.Second * 10) } time.Sleep(time.Second * 10) } }() fn := func() { for { chkMu.Lock() chkSS = false chkMu.Unlock() // 登录产生新的Session ss, err := login(lmaxUser, lmaxPassWord, request.ProductType.CFD_LIVE, lmaxUrl) if err != nil { time.Sleep(time.Second * 10) continue } ss.RegisterSessionDisconnected(func(s *lmaxapi.Session) { log.Println("session disconnected. STOP session!") s.Stop() }) regStreamError(ss) downloadRun(ss) // 历史数据 ss.LoadAllInstruments(func(value *response.Instrument) { onInstrument(value) //ss.Subscribe(request.NewOrderBookSubscriptionRequest(value.Id), DefaultSubscribeCB) }) ss.Wait() // 设置心跳 ss.HeartbeatTimeout(5 * time.Second) // 会话启动 chkMu.Lock() chkSS = true chkMu.Unlock() ss.Start() // 有错误时跳出Start重启 log.Println("lmax session RESTART !!!") } } fn() } func ifDuplicateDownload(insId string, period int, start int64) bool { stmap, ok := lmaxInsStartMap[insId] if ok { st, ok1 := stmap[period] if ok1 { if st == start { return true } else { lmaxInsStartMap[insId][period] = start return false } } else { lmaxInsStartMap[insId][period] = start return false } } else { lmaxInsStartMap[insId] = make(map[int]int64) lmaxInsStartMap[insId][period] = start return false } } func downloadRun(ss *lmaxapi.Session) { dlParser := newParser(1) // 解析器 dlTask := newDownTask(lmaxDB, dlParser, lmaxSaveDir, "", 1) // 下载任务器 ss.RegisterHistoricMarketDataEvent(func(s *lmaxapi.Session, event *response.HistoricMarketDataEvent) { if len(event.Urls) == 0 { log.Println("event.Urls:", len(event.Urls)) return } //for i, v := range event.Urls { //log.Println("url:", i, v) //} dlTask.downloadUrls(ss, event.Urls) }) lmaxInsPublisher2.Event().Attach(func(v interface{}) error { ins := v.(*market.Instrument) //insId := market.LmaxPrefix + strconv.Itoa(int(4001)) //if ins.Id != insId { //return nil //} lmaxId := market.RealInsId(ins.Id) ps := []int{0, market.M1, market.D1} for _, period := range ps { downloadInsHis(ss, ins.Id, lmaxId, period) } return nil }) } func downloadInsHis(ss *lmaxapi.Session, insId, lmaxId string, period int) { // log.Println("downloadInsHis", insId, lmaxId, period) switch period { case 0: downloadInsTicks(ss, insId, lmaxId) default: downloadInsCandles(ss, insId, lmaxId, period) } } // downloadInsTicks 下载tick数据 // update == true 指示是否下载最新数据 func downloadInsTicks(ss *lmaxapi.Session, insId, lmaxId string) { st := insMap[insId].StartTime / 1000 //if market.Debug { //st = time.Date(2015, 1, 1, 0, 0, 0, 0, time.Local).Unix() //} //r, err := lds.db.GetLastTime(insId, 0) //if err == nil && r != nil { //log.Println("downloadInsTicks", insId, err, st, r.StartTime) //st = r.StartTime / 1000 //} startTime, err := lmaxDB.GetInsStartTime(lmaxInsTable, insId, 0) startTime /= 1000 log.Println("downloadInsTicks", insId, err, st, startTime) if err == nil && startTime > st { st = startTime } //if time.Now().Unix()-st < int64(market.D1) { //market.M1 //return //} if ifDuplicateDownload(insId, 0, st) { return } log.Println("downloadInsTicks:", insId, lmaxId, st) iid, _ := strconv.Atoi(lmaxId) req := request.NewHistoricSubscriptionRequest() ss.Subscribe(req, func(err error) { if err != nil { // log.Println("downloadInsTicks Subscribe history request error:", err) doErrCode(ss, err) // 错误处理 return } hreq := request.NewTopOfBookHistoricRequest(lmaxId, int64(iid), time.Unix(st, 0), time.Now(), "CSV") log.Println(lmaxId, int64(iid), time.Unix(st, 0), time.Now(), "CSV") ss.RequestHistoricMarketData(hreq, func(err error) { if err != nil { log.Println("downloadInsTicks RequestHistoricMarketData error", err) } }) return }) } // downloadInsCandles 下载历史K线数据 // update == true 指示是否下载最新数据 func downloadInsCandles(ss *lmaxapi.Session, insId, lmaxId string, period int) { ps := "MINUTE" if period == market.D1 { ps = "DAY" } st := insMap[insId].StartTime / 1000 //if market.Debug { //st = time.Date(2015, 1, 1, 0, 0, 0, 0, time.Local).Unix() //} //r, err := lds.db.GetLastTime(insId, period) //if err == nil && r != nil { //log.Println("downloadInsCandles", insId, err, st, r.StartTime, r.EndTime, period) //st = r.EndTime / 1000 //if period == market.M1 { //st = r.StartTime / 1000 //} //} startTime, err := lmaxDB.GetInsStartTime(lmaxInsTable, insId, period) startTime /= 1000 log.Println("downloadInsCandles", insId, err, st, startTime, period) if err == nil && startTime > st { st = startTime } //if time.Now().Unix()-st < market.D1 { //int64(period) //return //} if ifDuplicateDownload(insId, period, st) { return } log.Println("lds.downloadInsCandles", insId, lmaxId, ps, st) iid, _ := strconv.Atoi(lmaxId) req := request.NewHistoricSubscriptionRequest() ss.Subscribe(req, func(err error) { if err != nil { // log.Println("downloadInsCandles Subscribe error:", err) doErrCode(ss, err) // 错误处理 return } hreq := request.NewAggregateHistoricRequest(lmaxId, int64(iid), time.Unix(st, 0), time.Now(), "CSV", ps, []string{"BID"}) // 使用卖价 log.Println(lmaxId, int64(iid), time.Unix(st, 0), time.Now(), "CSV", ps, []string{"BID"}) ss.RequestHistoricMarketData(hreq, func(err error) { if err != nil { log.Println("downloadInsCandles RequestHistoricMarketData error:", err) } }) }) } type dlWork struct { url string ss *lmaxapi.Session } // 下载任务 type downTask struct { in chan []*dlWork db *market.MyDB p *parser dirMu sync.Mutex dir string dir1 string mu sync.Mutex wss [][]*dlWork } func (task *downTask) addws(ws []*dlWork) bool { task.mu.Lock() defer task.mu.Unlock() for _, s := range task.wss { for _, w := range s { if w.url == ws[0].url { _, _, _, err := task.parseUrl(w.url) if err != nil { return false } //r, err := task.db.GetLastTime(insId, period) //if err != nil { //return false //} //if r != nil && fname == r.Refer { //// log.Println("download newest url:", w.url) //return true //} return false } } } task.wss = append(task.wss, ws) return true } // downloadUrls 下载一种instrument的url列表 // url列表是以时间顺序的, 下载完毕后, 要保证顺序给到解析器解析 func (task *downTask) downloadUrls(ss *lmaxapi.Session, urls []string) { ws := make([]*dlWork, len(urls)) for i := 0; i < len(urls); i++ { w := &dlWork{urls[i], ss} ws[i] = w } //if task.addws(ws) { task.in <- ws //} } func newDownTask(db *market.MyDB, p *parser, dir, dir1 string, max int) *downTask { task := &downTask{ in: make(chan []*dlWork, 1024), db: db, p: p, dir: dir, dir1: dir1, } // 并发下载 for i := 0; i < max; i++ { go task.dowork() } return task } func (task *downTask) dowork() { for { var ws []*dlWork select { case ws = <-task.in: default: select { case lmaxHisDownCh <- 1: default: } time.Sleep(time.Second * 1) continue } //ws := <-task.in //pws := []*parseWork{} wCount := len(ws) //pwCh := make(chan *parseWork, wCount) //errCh := make(chan error, wCount) resultCh := make(chan int, wCount) wsCh := make(chan *dlWork, 1) log.Println("download begin") go func() { for _, w := range ws { wsCh <- w } }() for index := 0; index < 4; index++ { go func() { for { w := <-wsCh i := 0 for { log.Println("down:", w.url) pw, err := task.downloadOne(w.ss, w.url) if err != nil { log.Println("down error:", err, w.url) i++ if i == 3 { resultCh <- 0 //errCh <- err break } continue } var print_url, print_fname string if w != nil { print_url = w.url } if pw != nil { print_fname = pw.fname } log.Println("down ok:", print_url, print_fname) //pwCh <- pw if pw != nil { pws := []*parseWork{} pws = append(pws, pw) task.p.parse(pws) pws = nil } resultCh <- 1 break } } }() } //bErr := false //var pw *parseWork //var err error for count := 0; count < len(ws); count++ { <-resultCh //select { //case pw = <-pwCh: //if pw != nil { //pws = append(pws, pw) //} //case err = <-errCh: //bErr = true //} } //close(wsCh) log.Println("download end") //if bErr { //log.Println("downloadOne error:", err, len(pws), len(ws)) ////return //} // 解析一组文件 //task.p.parse(pws) } } /*func (task *downTask) dowork() { for { ws := <-task.in pws := []*parseWork{} for _, w := range ws { i := 0 for { pw, err := task.downloadOne(w.ss, w.url) if err != nil { i++ if i == 3 { log.Printf("downloadOne error:", err) return } continue } if pw != nil { pws = append(pws, pw) } break } } // 解析一组文件 task.p.parse(pws) } }*/ func (task *downTask) parseUrl(u string) (fname, insId string, period int, e error) { surl, err := url.Parse(u) if err != nil { e = err return } bname := path.Base(surl.Path) ss := strings.Split(bname, "-") if len(ss) < 8 { e = errors.New("Url is error:") return } // 根据url文件名解析出insId和period insId = market.LmaxPrefix + ss[7] if strings.Contains(bname, "tick") { period = 0 } else if strings.Contains(bname, "minute") { period = market.M1 } else if strings.Contains(bname, "day") { period = market.D1 } else { log.Println("task.downloadOne error: url is error:", u) } fname = task.dir + surl.Path return } func (task *downTask) downloadOne(session *lmaxapi.Session, u string) (*parseWork, error) { fname, insId, period, err := task.parseUrl(u) if err != nil { return nil, err } if _, err := os.Stat(fname); !os.IsNotExist(err) { return &parseWork{fname, insId, period}, nil //return nil, nil // 文件存在 //r, err := task.db.GetLastTime(insId, period) //if err != nil { //return &parseWork{fname, insId, period}, nil //} //if r != nil && fname != r.Refer { //// 并不是最新的文件, 那么无需下载 //return nil, nil //} } resp, err := session.OpenUrl(u, nil) if err != nil { return nil, err } defer resp.Body.Close() // log.Println(u) // 保存到本地 fpath := path.Dir(fname) os.MkdirAll(fpath, 0777) w, err := os.Create(fname) if err != nil { return nil, err } n, err := io.Copy(w, resp.Body) w.Close() if err != nil { if err != io.EOF { return nil, err } } if n == 0 { return nil, errors.New("io.Copy write 0 byte: " + u) } return &parseWork{fname, insId, period}, nil } type parseWork struct { fname string insId string period int } // 解析器 type parser struct { pwch chan []*parseWork // 缓存解析任务 mu sync.Mutex pwss [][]*parseWork } func newParser(max int) *parser { fch := make(chan []*parseWork, 32) p := &parser{pwch: fch} // 指定数量的并发解析 for i := 0; i < max; i++ { go p.doParse() } return p } // 避免重复解析 func (p *parser) addpws(pws []*parseWork) bool { p.mu.Lock() defer p.mu.Unlock() for _, ws := range p.pwss { for _, pw := range ws { if pws[0].fname == pw.fname { //r, err := lmaxDB.GetLastTime(pw.insId, pw.period) //if err != nil { //return false //} //if r != nil && pw.fname == r.Refer { //return true //} return false } } } p.pwss = append(p.pwss, pws) return true } func (p *parser) parse(pws []*parseWork) { if len(pws) == 0 { return } //if p.addpws(pws) { p.pwch <- pws //} } func (p *parser) doParse() { for { var pws []*parseWork select { case pws = <-p.pwch: default: select { case lmaxHisParseCh <- 1: default: } time.Sleep(time.Second * 1) continue } //pws := <-p.pwch for _, w := range pws { log.Println("parse:", w.fname) ticks, candles, err := parseFile(w.fname, w.period) if err != nil { log.Println("lmax parseFile error:", err, w.fname) badname := w.fname + ".bad" os.Remove(badname) os.Rename(w.fname, badname) continue } log.Println("save:", w.fname) err = p.doSave(w.fname, w.insId, w.period, ticks, candles) if err != nil { log.Println("lmax doSave error:", err, w.fname) } } } } // parseFile 解析文件, refer表明文件来源 func parseFile(refer string, period int) ([]market.Tick, []market.Candle, error) { // 解析为[]Tick或者[]Candle f, err := os.Open(refer) if err != nil { return nil, nil, errors.New("parseFile error: " + err.Error()) } defer f.Close() return parse(f, period) } func ParseFile(refer string, period int) ([]market.Tick, []market.Candle, error) { return parseFile(refer, period) } func doConv(candles []market.Candle, insId string, period int) ([]market.Candle, error) { r := market.NewCandleBuf(candles) buf, err := market.ConvPeriod(r, insId, period) if err != nil { log.Println(err) } if len(buf) == 0 { return nil, errors.New("doConv error: result buf len is ZERO") } return buf, nil } func (p *parser) doSave(refer, insId string, period int, ticks []market.Tick, candles []market.Candle) error { // 保存到本地 //for i := 0; i < len(ticks); i++ { //log.Println(ticks[i].Timestamp) //} if period == 0 { // tick if len(ticks) == 0 { return io.ErrUnexpectedEOF } fname := strings.Replace(refer, "csv.gz", "TK", -1) //err := market.SaveTicks(lmaxDB, refer, fname, ticks, insId) _, err := market.SaveTickEx(lmaxSaveDir, ticks, insId, true) if err != nil { return err } lmaxDB.UpdateInsStartTime(lmaxInsTable, insId, ticks[len(ticks)-1].Timestamp, 0) log.Println("doSave", insId, ticks[0].Timestamp, 0, fname) } else { // candle pname := market.PeriodNameMap[period] fname := strings.Replace(refer, "csv.gz", pname, -1) if len(candles) == 0 { return io.ErrUnexpectedEOF } if period == market.D1 { bname := path.Base(refer) ss := strings.Split(bname, "-") if len(ss) < 8 { log.Println(bname) return errors.New("saveD1 error: " + fname) } dir := path.Dir(refer) for ; path.Base(dir) != ss[7]; dir = path.Dir(dir) { } fname = path.Join(dir, "candle.D1") dir = path.Join(lmaxSaveDir, insId) os.MkdirAll(dir, 0777) bname = fmt.Sprintf("%s.gz", market.PeriodNameMap[period]) fname = path.Join(dir, bname) candles, _ = combinEx(fname, candles) //err := market.SaveH1OrD1(lmaxDB, refer, fname, candles, insId, market.D1) _, err := market.SaveCandlesEx(lmaxSaveDir, insId, candles, market.D1, true) if err != nil { return err } lmaxDB.UpdateInsStartTime(lmaxInsTable, insId, candles[len(candles)-1].Timestamp, period) log.Println("doSave", insId, candles[0].Timestamp, period) return nil } //err := market.SaveCandles(lmaxDB, refer, fname, candles, insId, period) _, err := market.SaveCandlesEx(lmaxSaveDir, insId, candles, period, true) if err != nil { return err } lmaxDB.UpdateInsStartTime(lmaxInsTable, insId, candles[len(candles)-1].Timestamp, period) log.Println("doSave", insId, candles[0].Timestamp, period) // 转换周期 if period == market.M1 { buf, err := doConv(candles, insId, market.M5) if err != nil { return err } //refer := fname fname = strings.Replace(fname, "M1", "M5", -1) //err = market.SaveCandles(lmaxDB, refer, fname, buf, insId, market.M5) _, err = market.SaveCandlesEx(lmaxSaveDir, insId, buf, market.M5, true) if err != nil { return err } // log.Println("H1:", fname) buf, err = doConv(buf, insId, market.H1) if err != nil { return err } fname = path.Join(path.Dir(fname), "candle.H1") //return market.SaveH1OrD1(lmaxDB, refer, fname, buf, insId, market.H1) _, err = market.SaveCandlesEx(lmaxSaveDir, insId, buf, market.H1, true) return err } } return nil } // parse 从r读取所有的数据, 解析为[]tick或者[]Candle func parse(r io.Reader, period int) ([]market.Tick, []market.Candle, error) { zipr, err := gzip.NewReader(r) if err != nil { return nil, nil, err } candles := []market.Candle{} ticks := []market.Tick{} first := true csvr := csv.NewReader(zipr) for { data, err := csvr.Read() if err != nil { if err != io.EOF { return nil, nil, err } break } if first { first = false continue } if len(data) > 0 && data[0] == "" { continue } if len(data) > 1 && data[1] == "" { continue } if period == 0 { tick, e := parseTick(data) if e != nil { log.Println(e, data) continue } ticks = append(ticks, *tick) } else { candle, e := parseCandle(data) if e != nil { log.Println(e, data) continue } candles = append(candles, *candle) } } return ticks, candles, nil } // parseTick解析一行csv数据为一个tick func parseTick(data []string) (*market.Tick, error) { tick := market.Tick{} if len(data) != 5 { return nil, errors.New("error data format.") } timestamp, err := strconv.ParseInt(data[0], 10, 64) if err != nil { return nil, err } tick.Timestamp = timestamp var ff [4]float64 for i := 1; i < 5; i++ { f, err := strconv.ParseFloat(data[i], 64) if err != nil { return nil, err } ff[i-1] = f } tick.Price = ff[0] // bid price tick.Volume = ff[1] // bid qty tick.Bid[0] = ff[0] tick.Bid[1] = ff[1] tick.Ask[0] = ff[2] tick.Ask[1] = ff[3] return &tick, nil } // parseTick解析一行csv数据为一个candle func parseCandle(data []string) (*market.Candle, error) { if len(data) < 11 { return nil, errors.New("the csv field not match") } t, err := strconv.ParseInt(data[0], 10, 64) if err != nil { return nil, err } var ff [10]float64 for i := 0; i < 10; i++ { ff[i], err = strconv.ParseFloat(data[i+1], 64) if err != nil { return nil, err } } return &market.Candle{ Timestamp: t, Open: ff[0], High: ff[1], Low: ff[2], Close: ff[3], RealVolums: ff[4] + ff[5] + ff[6], TickVolums: ff[7] + ff[8] + ff[9], }, nil } func combinEx(filename string, candles []market.Candle) ([]market.Candle, error) { buf, err := market.ReadCandleFile(filename) if err != nil { return candles, err } candles = append(buf, candles[:]...) sort.Sort(byTime(candles)) return candles, nil } // 周期为D1的只保存一个文件 // 周期为H1的每个月保存一个文件 // 其他周期(