// history.go package tick //import "log" import "sync" import "sort" import "time" import "tickserver/server/market" import "net/http" import "net/url" import "path" import "io" import "os" import "compress/gzip" import "encoding/binary" import "tickserver/framework/base" import "unsafe" import "strings" type ParseFileInfo struct { fname string begtime int64 } type FileCandleMaker struct { candleGenerators []*base.Candle ohlcs []base.Ohlc candless [][]market.Candle dayLasts []int64 } type CandleMaker struct { gds *GeneralDS typ string typId int dataDir string url string fileserver string db *market.MyDB client *http.Client hiss []TickIndex hmu sync.Mutex files []ParseFileInfo fmu sync.Mutex m2ch chan *Market2 fileCandleMakersMap map[string]*FileCandleMaker tmpFileNameMap map[string]int } var hisTable = "history" var periodSet = []int{market.M1, market.M5, market.H1, market.D1} type byHisInfo []TickIndex func (a byHisInfo) Len() int { return len(a) } func (a byHisInfo) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a byHisInfo) Less(i, j int) bool { return a[i].Begtime < a[j].Begtime } func (cm *CandleMaker) run() { lasttime, err := cm.db.GetHisLastTime(hisTable, cm.typ) if err != nil { //log.Println(err) return } cm.fileCandleMakersMap = make(map[string]*FileCandleMaker) cm.tmpFileNameMap = make(map[string]int) cm.m2ch = make(chan *Market2, 20480) go cm.makeCandle() go cm.parse() go cm.download() for { hiss, err := cm.getHisList(lasttime) if err != nil { //log.Println(err) } cm.hmu.Lock() cm.hiss = append(cm.hiss, hiss...) sort.Sort(byHisInfo(cm.hiss)) cm.hmu.Unlock() hisnum := len(cm.hiss) if hisnum > 0 { lasttime = cm.hiss[hisnum-1].Begtime + 1 } time.Sleep(time.Minute * 1) } } func (cm *CandleMaker) getHisList(lasttime int64) ([]TickIndex, error) { var hislist []TickIndex var offset int num := 1000 for num >= 1000 { req := &DownloadRequest{Type: cm.typ, Start: lasttime, End: 0, Offset: offset, Count: 1000, OrderBy: "time asc"} //log.Println("history", req) body, err := httpReq(cm.client, "history", cm.url, req) if err != nil { //log.Println("httpReq", err) return hislist, err } var ticks []TickIndex _, err = decodeResp(body, &ticks) if err != nil { //log.Println("decodeResp", err) return hislist, err } //log.Println("history num:", len(ticks)) hislist = append(hislist, ticks...) //log.Println(ticks) num = len(ticks) offset += num } return hislist, nil } func (cm *CandleMaker) download() { for { bHas := false var ti TickIndex cm.hmu.Lock() if len(cm.hiss) > 0 { ti = cm.hiss[0] bHas = true } cm.hmu.Unlock() if bHas { fname, err := cm.downloadOne(ti) if err == nil { cm.hmu.Lock() cm.hiss = cm.hiss[1:] cm.hmu.Unlock() //log.Println("history download:", fname) cm.fmu.Lock() cm.files = append(cm.files, ParseFileInfo{fname: fname, begtime: ti.Begtime}) cm.fmu.Unlock() } else { //log.Println("history download:", ti, err) time.Sleep(1 * time.Minute) } } else { time.Sleep(1 * time.Second) } } } func (cm *CandleMaker) downloadOne(ti TickIndex) (string, error) { u, err := url.Parse(ti.Path) if err != nil { ti.Path = "http://" + cm.fileserver + ti.Path } else { if u.Scheme == "" { u.Scheme = "http" } if u.Host == "" { u.Host = cm.fileserver } ti.Path = u.String() } //u = ti.Path //u = strings.Replace(u, "fzm", "", 1) res, err := http.Get(ti.Path) if err != nil { return "", err } defer res.Body.Close() surl, err := url.Parse(ti.Path) if err != nil { return "", err } fname := path.Join(cm.dataDir, "tmp", surl.Path) dir := path.Dir(fname) os.MkdirAll(dir, 0777) w, err := os.Create(fname) if err != nil { return "", err } //log.Println(ti.Path) defer w.Close() _, err = io.Copy(w, res.Body) if err != nil { return "", err } return fname, nil } func (cm *CandleMaker) parse() { // 保存数据到文件 ticker := time.Tick(time.Second * 30) for t := range ticker { if t.Hour() == 0 && t.Minute() == 30 { // 8:30点时保存 if len(cm.files) > 0 { for len(cm.files) > 0 { bHas := false var pfi ParseFileInfo cm.fmu.Lock() if len(cm.files) > 0 { pfi = cm.files[0] bHas = true } cm.fmu.Unlock() if bHas { err := cm.ParseOne(pfi.fname) cm.fmu.Lock() cm.files = cm.files[1:] cm.fmu.Unlock() if err != nil { os.Rename(pfi.fname, pfi.fname+".bad") //log.Println("history parse:", len(cm.files), pfi.fname, err) } else { os.Remove(pfi.fname) //log.Println("history parse:", len(cm.files), pfi.fname) if len(cm.files) == 0 { err = cm.db.UpdateHisLastTime(hisTable, cm.typ, pfi.begtime+1) //tks[len(tks)-1].Timestamp if err != nil { //log.Println("cm.db.UpdateHisLastTime", err, cm.typ, pfi.begtime) //tks[len(tks)-1].Timestamp } } } } } //notify no more data cm.m2ch <- nil } } } } func (cm *CandleMaker) ParseOne(fname string) error { f, err := os.Open(fname) if err != nil { return err } defer f.Close() gr, err := gzip.NewReader(f) if err != nil { return err } defer gr.Close() var lasttime, readonly, lastcount int32 err = binary.Read(gr, binary.LittleEndian, &lasttime) if err != nil { return err } err = binary.Read(gr, binary.LittleEndian, &readonly) if err != nil { return err } err = binary.Read(gr, binary.LittleEndian, &lastcount) if err != nil { return err } for { m := &Market2{} err := binary.Read(gr, binary.LittleEndian, m) if err != nil { if err != io.EOF { return err } else { return nil } } if m.Type != int64(cm.typId) { //log.Println("history wrongggggggggg typ", m.Type, cm.typId) continue } cm.m2ch <- m } return nil } func (cm *CandleMaker) makeCandle() { dir := path.Join(cm.dataDir, cm.typ) for { m := <-cm.m2ch if m == nil { for k, v := range cm.fileCandleMakersMap { for i, _ := range v.candless { if periodSet[i] != market.D1 { for n, tmpcandle := range v.candless[i] { day := tmpcandle.Timestamp / (1000 * 3600 * 24) if day != v.dayLasts[i] && v.dayLasts[i] != 0 { fname, err := market.SaveCandlesTmp(dir, k, v.candless[i][:n], periodSet[i], false) if err != nil { //log.Println(fname, err) } else { cm.tmpFileNameMap[fname] = 0 //log.Println(fname) } v.candless[i] = v.candless[i][n:] } v.dayLasts[i] = day } } fname, err := market.SaveCandlesTmp(dir, k, v.candless[i], periodSet[i], false) if err != nil { //log.Println(fname, err) } else { if periodSet[i] != market.D1 { cm.tmpFileNameMap[fname] = 0 } //log.Println(fname) } v.candless[i] = nil } } for k, _ := range cm.tmpFileNameMap { fname := strings.TrimSuffix(k, ".tmp") //log.Println(fname, k) if fname != k { if _, err := os.Stat(fname); os.IsNotExist(err) { if _, err := os.Stat(k); err == nil { for err = os.Rename(k, fname); err != nil; err = os.Rename(k, fname) { time.Sleep(time.Second) } } } } } cm.tmpFileNameMap = nil cm.tmpFileNameMap = make(map[string]int) continue } insIdStr := cm.gds.getInsIdStr(m.InsId) if insIdStr == "" { //log.Println("wrong insId:", cm.typ, m.InsId) continue } fcm, ok := cm.fileCandleMakersMap[insIdStr] if !ok { candleGenerators := make([]*base.Candle, len(periodSet)) ohlcs := make([]base.Ohlc, len(periodSet)) for i, period := range periodSet { candleGenerators[i], _ = base.NewCandle(period, 2, nil, 0) if strings.HasPrefix(insIdStr, Ctp) { candleGenerators[i].Set(base.CANDLE_AUTOCOMPLETE_MAX, 1) } ohlcs[i] = base.Ohlc{} } fcm = &FileCandleMaker{ candleGenerators: candleGenerators[:], ohlcs: ohlcs[:], candless: make([][]market.Candle, len(periodSet)), dayLasts: make([]int64, len(periodSet)), } cm.fileCandleMakersMap[insIdStr] = fcm } for i, candleGenerator := range fcm.candleGenerators { tg := Mk2Tg(*m) num := candleGenerator.UpdateTick((*base.Tick)(unsafe.Pointer(&tg))) var tmpcandles []market.Candle if num == 0 { candleGenerator.Next(&fcm.ohlcs[i]) ohlcGo := fcm.ohlcs[i].ToGOStruct() tmpcandles = append(tmpcandles, OhlcGo2Candle(ohlcGo)) } else if num > 0 { for j := 0; j < num; j++ { candleGenerator.Next(&fcm.ohlcs[i]) ohlcGo := fcm.ohlcs[i].ToGOStruct() tmpcandles = append(tmpcandles, OhlcGo2Candle(ohlcGo)) } } else { //log.Println("tick error.") } for _, tmpcandle := range tmpcandles { if periodSet[i] != market.D1 { day := tmpcandle.Timestamp / (1000 * 3600 * 24) if day != fcm.dayLasts[i] && fcm.dayLasts[i] != 0 { fname, err := market.SaveCandlesTmp(dir, insIdStr, fcm.candless[i], periodSet[i], false) if err != nil { //log.Println(fname, err) } else { cm.tmpFileNameMap[fname] = 0 //log.Println(fname) } fcm.candless[i] = nil } fcm.dayLasts[i] = day } if len(fcm.candless[i]) > 0 && fcm.candless[i][len(fcm.candless[i])-1].Timestamp == tmpcandle.Timestamp { fcm.candless[i][len(fcm.candless[i])-1] = tmpcandle } else { fcm.candless[i] = append(fcm.candless[i], tmpcandle) } } } } } func Mk2Tg(mk Market2) base.TickGo { var tg base.TickGo tg.Time = int32(mk.Timestamp / 1000) tg.Ms = int16(mk.Timestamp % 1000) tg.Symbol = 0 tg.Bid = float32(mk.LastPrice) //tk.Bid[0] //tg.Ask = float32(tk.Price) //tk.Ask[0] tg.Bidv = float32(mk.LastVolume) //tk.Bid[1] //tg.Askv = int32(tk.Volume) //tk.Ask[1] return tg } func Mk2Tk(mk *Market2) market.Tick { var tk market.Tick tk.Timestamp = mk.Timestamp tk.Price = mk.LastPrice tk.Volume = mk.LastVolume tk.Ask[0] = mk.Asks[0][0] tk.Ask[1] = mk.Asks[0][1] tk.Bid[0] = mk.Bids[0][0] tk.Bid[1] = mk.Bids[0][1] return tk } func OhlcGo2Candle(ohlcGo base.OhlcGo) market.Candle { var c market.Candle c.Timestamp = int64(ohlcGo.Time) * 1000 c.Open = ohlcGo.Open c.High = ohlcGo.High c.Low = ohlcGo.Low c.Close = ohlcGo.Close c.TickVolums = float64(ohlcGo.TickVolumn) c.RealVolums = float64(ohlcGo.RealVolumn) return c }