123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439 |
- // history.go
- package tick
- //import "log"
- import "sync"
- import "sort"
- import "time"
- import "tickserver/server/market"
- import "net/http"
- import "net/url"
- import "path"
- import "io"
- import "os"
- import "compress/gzip"
- import "encoding/binary"
- import "tickserver/framework/base"
- import "unsafe"
- import "strings"
- type ParseFileInfo struct {
- fname string
- begtime int64
- }
- type FileCandleMaker struct {
- candleGenerators []*base.Candle
- ohlcs []base.Ohlc
- candless [][]market.Candle
- dayLasts []int64
- }
- type CandleMaker struct {
- gds *GeneralDS
- typ string
- typId int
- dataDir string
- url string
- fileserver string
- db *market.MyDB
- client *http.Client
- hiss []TickIndex
- hmu sync.Mutex
- files []ParseFileInfo
- fmu sync.Mutex
- m2ch chan *Market2
- fileCandleMakersMap map[string]*FileCandleMaker
- tmpFileNameMap map[string]int
- }
- var hisTable = "history"
- var periodSet = []int{market.M1, market.M5, market.H1, market.D1}
- type byHisInfo []TickIndex
- func (a byHisInfo) Len() int { return len(a) }
- func (a byHisInfo) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
- func (a byHisInfo) Less(i, j int) bool { return a[i].Begtime < a[j].Begtime }
- func (cm *CandleMaker) run() {
- lasttime, err := cm.db.GetHisLastTime(hisTable, cm.typ)
- if err != nil {
- //log.Println(err)
- return
- }
- cm.fileCandleMakersMap = make(map[string]*FileCandleMaker)
- cm.tmpFileNameMap = make(map[string]int)
- cm.m2ch = make(chan *Market2, 20480)
- go cm.makeCandle()
- go cm.parse()
- go cm.download()
- for {
- hiss, err := cm.getHisList(lasttime)
- if err != nil {
- //log.Println(err)
- }
- cm.hmu.Lock()
- cm.hiss = append(cm.hiss, hiss...)
- sort.Sort(byHisInfo(cm.hiss))
- cm.hmu.Unlock()
- hisnum := len(cm.hiss)
- if hisnum > 0 {
- lasttime = cm.hiss[hisnum-1].Begtime + 1
- }
- time.Sleep(time.Minute * 1)
- }
- }
- func (cm *CandleMaker) getHisList(lasttime int64) ([]TickIndex, error) {
- var hislist []TickIndex
- var offset int
- num := 1000
- for num >= 1000 {
- req := &DownloadRequest{Type: cm.typ, Start: lasttime, End: 0, Offset: offset, Count: 1000, OrderBy: "time asc"}
- //log.Println("history", req)
- body, err := httpReq(cm.client, "history", cm.url, req)
- if err != nil {
- //log.Println("httpReq", err)
- return hislist, err
- }
- var ticks []TickIndex
- _, err = decodeResp(body, &ticks)
- if err != nil {
- //log.Println("decodeResp", err)
- return hislist, err
- }
- //log.Println("history num:", len(ticks))
- hislist = append(hislist, ticks...)
- //log.Println(ticks)
- num = len(ticks)
- offset += num
- }
- return hislist, nil
- }
- func (cm *CandleMaker) download() {
- for {
- bHas := false
- var ti TickIndex
- cm.hmu.Lock()
- if len(cm.hiss) > 0 {
- ti = cm.hiss[0]
- bHas = true
- }
- cm.hmu.Unlock()
- if bHas {
- fname, err := cm.downloadOne(ti)
- if err == nil {
- cm.hmu.Lock()
- cm.hiss = cm.hiss[1:]
- cm.hmu.Unlock()
- //log.Println("history download:", fname)
- cm.fmu.Lock()
- cm.files = append(cm.files, ParseFileInfo{fname: fname, begtime: ti.Begtime})
- cm.fmu.Unlock()
- } else {
- //log.Println("history download:", ti, err)
- time.Sleep(1 * time.Minute)
- }
- } else {
- time.Sleep(1 * time.Second)
- }
- }
- }
- func (cm *CandleMaker) downloadOne(ti TickIndex) (string, error) {
- u, err := url.Parse(ti.Path)
- if err != nil {
- ti.Path = "http://" + cm.fileserver + ti.Path
- } else {
- if u.Scheme == "" {
- u.Scheme = "http"
- }
- if u.Host == "" {
- u.Host = cm.fileserver
- }
- ti.Path = u.String()
- }
- //u = ti.Path
- //u = strings.Replace(u, "fzm", "", 1)
- res, err := http.Get(ti.Path)
- if err != nil {
- return "", err
- }
- defer res.Body.Close()
- surl, err := url.Parse(ti.Path)
- if err != nil {
- return "", err
- }
- fname := path.Join(cm.dataDir, "tmp", surl.Path)
- dir := path.Dir(fname)
- os.MkdirAll(dir, 0777)
- w, err := os.Create(fname)
- if err != nil {
- return "", err
- }
- //log.Println(ti.Path)
- defer w.Close()
- _, err = io.Copy(w, res.Body)
- if err != nil {
- return "", err
- }
- return fname, nil
- }
- func (cm *CandleMaker) parse() {
- // 保存数据到文件
- ticker := time.Tick(time.Second * 30)
- for t := range ticker {
- if t.Hour() == 0 && t.Minute() == 30 { // 8:30点时保存
- if len(cm.files) > 0 {
- for len(cm.files) > 0 {
- bHas := false
- var pfi ParseFileInfo
- cm.fmu.Lock()
- if len(cm.files) > 0 {
- pfi = cm.files[0]
- bHas = true
- }
- cm.fmu.Unlock()
- if bHas {
- err := cm.ParseOne(pfi.fname)
- cm.fmu.Lock()
- cm.files = cm.files[1:]
- cm.fmu.Unlock()
- if err != nil {
- os.Rename(pfi.fname, pfi.fname+".bad")
- //log.Println("history parse:", len(cm.files), pfi.fname, err)
- } else {
- os.Remove(pfi.fname)
- //log.Println("history parse:", len(cm.files), pfi.fname)
- if len(cm.files) == 0 {
- err = cm.db.UpdateHisLastTime(hisTable, cm.typ, pfi.begtime+1) //tks[len(tks)-1].Timestamp
- if err != nil {
- //log.Println("cm.db.UpdateHisLastTime", err, cm.typ, pfi.begtime) //tks[len(tks)-1].Timestamp
- }
- }
- }
- }
- }
- //notify no more data
- cm.m2ch <- nil
- }
- }
- }
- }
- func (cm *CandleMaker) ParseOne(fname string) error {
- f, err := os.Open(fname)
- if err != nil {
- return err
- }
- defer f.Close()
- gr, err := gzip.NewReader(f)
- if err != nil {
- return err
- }
- defer gr.Close()
- var lasttime, readonly, lastcount int32
- err = binary.Read(gr, binary.LittleEndian, &lasttime)
- if err != nil {
- return err
- }
- err = binary.Read(gr, binary.LittleEndian, &readonly)
- if err != nil {
- return err
- }
- err = binary.Read(gr, binary.LittleEndian, &lastcount)
- if err != nil {
- return err
- }
- for {
- m := &Market2{}
- err := binary.Read(gr, binary.LittleEndian, m)
- if err != nil {
- if err != io.EOF {
- return err
- } else {
- return nil
- }
- }
- if m.Type != int64(cm.typId) {
- //log.Println("history wrongggggggggg typ", m.Type, cm.typId)
- continue
- }
- cm.m2ch <- m
- }
- return nil
- }
- func (cm *CandleMaker) makeCandle() {
- dir := path.Join(cm.dataDir, cm.typ)
- for {
- m := <-cm.m2ch
- if m == nil {
- for k, v := range cm.fileCandleMakersMap {
- for i, _ := range v.candless {
- if periodSet[i] != market.D1 {
- for n, tmpcandle := range v.candless[i] {
- day := tmpcandle.Timestamp / (1000 * 3600 * 24)
- if day != v.dayLasts[i] && v.dayLasts[i] != 0 {
- fname, err := market.SaveCandlesTmp(dir, k, v.candless[i][:n], periodSet[i], false)
- if err != nil {
- //log.Println(fname, err)
- } else {
- cm.tmpFileNameMap[fname] = 0
- //log.Println(fname)
- }
- v.candless[i] = v.candless[i][n:]
- }
- v.dayLasts[i] = day
- }
- }
- fname, err := market.SaveCandlesTmp(dir, k, v.candless[i], periodSet[i], false)
- if err != nil {
- //log.Println(fname, err)
- } else {
- if periodSet[i] != market.D1 {
- cm.tmpFileNameMap[fname] = 0
- }
- //log.Println(fname)
- }
- v.candless[i] = nil
- }
- }
- for k, _ := range cm.tmpFileNameMap {
- fname := strings.TrimSuffix(k, ".tmp")
- //log.Println(fname, k)
- if fname != k {
- if _, err := os.Stat(fname); os.IsNotExist(err) {
- if _, err := os.Stat(k); err == nil {
- for err = os.Rename(k, fname); err != nil; err = os.Rename(k, fname) {
- time.Sleep(time.Second)
- }
- }
- }
- }
- }
- cm.tmpFileNameMap = nil
- cm.tmpFileNameMap = make(map[string]int)
- continue
- }
- insIdStr := cm.gds.getInsIdStr(m.InsId)
- if insIdStr == "" {
- //log.Println("wrong insId:", cm.typ, m.InsId)
- continue
- }
- fcm, ok := cm.fileCandleMakersMap[insIdStr]
- if !ok {
- candleGenerators := make([]*base.Candle, len(periodSet))
- ohlcs := make([]base.Ohlc, len(periodSet))
- for i, period := range periodSet {
- candleGenerators[i], _ = base.NewCandle(period, 2, nil, 0)
- if strings.HasPrefix(insIdStr, Ctp) {
- candleGenerators[i].Set(base.CANDLE_AUTOCOMPLETE_MAX, 1)
- }
- ohlcs[i] = base.Ohlc{}
- }
- fcm = &FileCandleMaker{
- candleGenerators: candleGenerators[:],
- ohlcs: ohlcs[:],
- candless: make([][]market.Candle, len(periodSet)),
- dayLasts: make([]int64, len(periodSet)),
- }
- cm.fileCandleMakersMap[insIdStr] = fcm
- }
- for i, candleGenerator := range fcm.candleGenerators {
- tg := Mk2Tg(*m)
- num := candleGenerator.UpdateTick((*base.Tick)(unsafe.Pointer(&tg)))
- var tmpcandles []market.Candle
- if num == 0 {
- candleGenerator.Next(&fcm.ohlcs[i])
- ohlcGo := fcm.ohlcs[i].ToGOStruct()
- tmpcandles = append(tmpcandles, OhlcGo2Candle(ohlcGo))
- } else if num > 0 {
- for j := 0; j < num; j++ {
- candleGenerator.Next(&fcm.ohlcs[i])
- ohlcGo := fcm.ohlcs[i].ToGOStruct()
- tmpcandles = append(tmpcandles, OhlcGo2Candle(ohlcGo))
- }
- } else {
- //log.Println("tick error.")
- }
- for _, tmpcandle := range tmpcandles {
- if periodSet[i] != market.D1 {
- day := tmpcandle.Timestamp / (1000 * 3600 * 24)
- if day != fcm.dayLasts[i] && fcm.dayLasts[i] != 0 {
- fname, err := market.SaveCandlesTmp(dir, insIdStr, fcm.candless[i], periodSet[i], false)
- if err != nil {
- //log.Println(fname, err)
- } else {
- cm.tmpFileNameMap[fname] = 0
- //log.Println(fname)
- }
- fcm.candless[i] = nil
- }
- fcm.dayLasts[i] = day
- }
- if len(fcm.candless[i]) > 0 && fcm.candless[i][len(fcm.candless[i])-1].Timestamp == tmpcandle.Timestamp {
- fcm.candless[i][len(fcm.candless[i])-1] = tmpcandle
- } else {
- fcm.candless[i] = append(fcm.candless[i], tmpcandle)
- }
- }
- }
- }
- }
- func Mk2Tg(mk Market2) base.TickGo {
- var tg base.TickGo
- tg.Time = int32(mk.Timestamp / 1000)
- tg.Ms = int16(mk.Timestamp % 1000)
- tg.Symbol = 0
- tg.Bid = float32(mk.LastPrice) //tk.Bid[0]
- //tg.Ask = float32(tk.Price) //tk.Ask[0]
- tg.Bidv = float32(mk.LastVolume) //tk.Bid[1]
- //tg.Askv = int32(tk.Volume) //tk.Ask[1]
- return tg
- }
- func Mk2Tk(mk *Market2) market.Tick {
- var tk market.Tick
- tk.Timestamp = mk.Timestamp
- tk.Price = mk.LastPrice
- tk.Volume = mk.LastVolume
- tk.Ask[0] = mk.Asks[0][0]
- tk.Ask[1] = mk.Asks[0][1]
- tk.Bid[0] = mk.Bids[0][0]
- tk.Bid[1] = mk.Bids[0][1]
- return tk
- }
- func OhlcGo2Candle(ohlcGo base.OhlcGo) market.Candle {
- var c market.Candle
- c.Timestamp = int64(ohlcGo.Time) * 1000
- c.Open = ohlcGo.Open
- c.High = ohlcGo.High
- c.Low = ohlcGo.Low
- c.Close = ohlcGo.Close
- c.TickVolums = float64(ohlcGo.TickVolumn)
- c.RealVolums = float64(ohlcGo.RealVolumn)
- return c
- }
|