// Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved. package tick // 本文件实现general数据源接口, 实时数据和历史数据的获取和保存 import ( "bytes" "encoding/json" "errors" "fmt" "io" "io/ioutil" //"log" "net/http" "path" "strconv" "strings" "time" "tickserver/framework/event" "tickserver/markinfo" "tickserver/server/market" "github.com/niniwzw/http2" ) var tr = &http2.Transport{ InsecureTLSDial: true, Timeout: 2000 * time.Millisecond, } // GeneralDS实现了dataSource接口, 并对general的历史数据和实时数据保存 type GeneralDS struct { *market.DSBase cm *CandleMaker client *http.Client insPublisher event.EventPublisher insMap map[string]*market.Instrument idMappingMap map[int64]string typ string typId int url string st int64 curId int64 } type JsonResp2 struct { Result *json.RawMessage `json:"result"` Err string `json:"err"` Code int `json:"code"` } func NewGeneralDS(typ, url, fileserver, dataDir string, db *market.MyDB, bSSL bool) (*GeneralDS, error) { dir := dataDir + "/" + typ //now := time.Now() //st := time.Date(now.Year(), now.Month(), now.Day(), 6, 0, 0, 0, time.Local).Unix() * 1000 typId := TypeId(typ) var client *http.Client if bSSL { client = &http.Client{Transport: tr} } else { client = &http.Client{} } gds := &GeneralDS{ DSBase: market.NewDsBase(db, dir), insMap: make(map[string]*market.Instrument), idMappingMap: make(map[int64]string), typ: typ, typId: typId, url: url, //st: st, client: client, cm: &CandleMaker{ typ: typ, typId: typId, url: url, dataDir: dataDir, fileserver: fileserver, db: db, client: client, }, } gds.cm.gds = gds return gds, nil } func (gds *GeneralDS) SubIns() *event.Event { return gds.insPublisher.Event() } func (gds *GeneralDS) Run() { //log.Println("GeneralDS.Run, type:", gds.typ) /*goroutineNum := 64 switch gds.typId { case IntLmax: goroutineNum = 8 case IntEasyForex: goroutineNum = 4 case IntCtp: goroutineNum = 32 case IntDzh: goroutineNum = 64 case IntTdx: goroutineNum = 512 }*/ go gds.DoReadEx() go gds.cm.run() //log.Println("getInss begin", gds.typ) gds.getInss() //log.Println("getInss end", gds.typ) //log.Println("load begin", gds.typ) gds.load() //log.Println("load end", gds.typ) //gds.download() //log.Println("stream begin", gds.typ) gds.stream() //log.Println("stream end", gds.typ) } func (gds *GeneralDS) load() { year := fmt.Sprintf("%d", time.Now().Year()) for i, v := range gds.insMap { dir := path.Join(gds.cm.dataDir, gds.typ, i, year) //log.Println(dir) infos, err := ioutil.ReadDir(dir) if err != nil { //log.Println(err) continue } var curname string for j := 0; j < len(infos); j++ { if strings.HasSuffix(infos[j].Name(), ".tk.gz") { curname = infos[j].Name() } } tickfile := path.Join(dir, curname) //log.Println(tickfile) ticks, _ := market.ReadTickFile(tickfile) for _, tick := range ticks { var mk market.Market mk.InsId = v.Id mk.Timestamp = tick.Timestamp mk.Close = tick.Price mk.Low = tick.Price mk.High = tick.Price mk.Open = tick.Price mk.AllAmount = tick.Volume mk.AllVolume = tick.Volume mk.LastPrice = tick.Price mk.Volume = tick.Volume mk.Bids = make([]market.PP, 1) mk.Bids = append(mk.Bids, tick.Bid) mk.Asks = make([]market.PP, 1) mk.Asks = append(mk.Asks, tick.Ask) //time.Sleep(time.Millisecond) gds.SaveL(&mk) } } } func (gds *GeneralDS) getIns(id int64) (Instrument, error) { var ins Instrument req := &InstrumentRequest{Type: gds.typ, Id: id} body, err := httpReq(gds.client, "instrument", gds.url, req) if err != nil { //log.Println("httpreq", err) return ins, err } _, err = decodeResp(body, &ins) if err != nil { //log.Println("decodeResponse", err) return ins, err } return ins, nil } func (gds *GeneralDS) getInss() { for { req := &InstrumentsRequest{Type: gds.typ} body, err := httpReq(gds.client, "instruments", gds.url, req) if err != nil { //log.Println("httpreq", err) time.Sleep(10 * time.Second) continue } var inss []Instrument _, err = decodeResp(body, &inss) if err != nil { //log.Println("decodeResponse", err) return } for _, v := range inss { ins := tkIns2mkIns(gds.typ, v) //if tk.Type == IntSina { //log.Println("sssss", *ins) //} gds.idMappingMap[v.Id] = ins.Id gds.insMap[ins.Id] = ins gds.insPublisher.Publish(ins) } break } } func tkIns2mkIns(typ string, tIns Instrument) *market.Instrument { var insId string if typ == DataTypeName(IntCtp) { insId = typ + "_" intTyp := tIns.Id / 10000 insTyp, _ := ctpTyps[int(intTyp)] insId += insTyp intSuffix := tIns.Id % 10000 insId += strconv.Itoa(int(intSuffix)) } else if typ == DataTypeName(IntEasyForex) || typ == DataTypeName(IntOanda) || typ == DataTypeName(IntBtc) || typ == DataTypeName(IntPolo) || typ == DataTypeName(IntBty) || typ == DataTypeName(IntCFix) || typ == DataTypeName(IntHuobi) || typ == DataTypeName(IntYunbi) || typ == DataTypeName(IntChbtc) { idStr, _ := markinfo.SymbolName(int(tIns.Id)) insId = typ + "_" + idStr } else { if typ == Sina || typ == Dzh || typ == Tdx { insId = typ + "_" + fmt.Sprintf("%06d", tIns.Id) } else { insId = typ + "_" + fmt.Sprintf("%d", tIns.Id) } } ins := &market.Instrument{ Id: insId, Name: tIns.Name, Typ: tIns.Type, ExId: tIns.ExId, PriceInc: tIns.PriceInc, Margin: tIns.Margin, StartTime: tIns.StartTime, EndTime: tIns.EndTime, } return ins } func (gds *GeneralDS) download() { var offset int num := 1000 downstart := gds.st downid := gds.curId for num >= 1000 { req := &DownloadRequest{Type: gds.typ, Start: downstart, End: downid, Offset: offset, Count: 1000, OrderBy: "time asc"} //log.Println("download", req) body, err := httpReq(gds.client, "download", gds.url, req) if err != nil { //log.Println("httpReq", err) return } var ticks []*Market _, err = decodeResp(body, &ticks) if err != nil { //log.Println("decodeResp", err) return } //log.Println("download num:", len(ticks)) for _, v := range ticks { if v.Type != int32(gds.typId) { //log.Println("download wrongggggggggg typ", v.Type, gds.typId) continue } //gds.st = v.Timestamp //gds.curId = v.InsId ins := gds.addIns(v.InsId) if nil == ins { continue } mk := tMk2mMk(*v, ins) //ins.SetMk(&mk) gds.Save(&mk) //printDelay(DataTypeName(int(v.Type)), "eeeee"+fmt.Sprint(v.InsId), v.Timestamp) } //log.Println(ticks) num = len(ticks) offset += num } } func (gds *GeneralDS) stream() { for { req := &StreamRequest{Type: gds.typ} //log.Println("streamReq begin", gds.typ) body, err := streamReq(gds.client, "stream", gds.url, req) if err != nil { //log.Println("streamReq", err) time.Sleep(10 * time.Second) continue } //log.Println("streamReq end", gds.typ) decoder := json.NewDecoder(body) var tick Market for { //fmt.Printf("[O]") err = decoder.Decode(&tick) //fmt.Printf("[C]") if err != nil { body.Close() //log.Println("connect retry...", err) time.Sleep(time.Second * 1) break } if tick.Type != int32(gds.typId) { //log.Println("stream wrongggggggggg typ", tick.Type, gds.typId) continue } gds.st = tick.Timestamp gds.curId = tick.InsId ins := gds.addIns(tick.InsId) if nil == ins { continue } //if gds.typId == IntTdx && tick.InsId == 1 { //log.Println("[stream]data trace") //} mk := tMk2mMk(tick, ins) ins.SetMk(&mk) gds.Save(&mk) } } } func decodeResp(datain []byte, dataout interface{}) (int, error) { var resp JsonResp2 err := json.Unmarshal(datain, &resp) if err != nil { return -1, err } if resp.Err != "" { return -1, errors.New(resp.Err) } if resp.Result == nil { return 0, nil } json.Unmarshal(*resp.Result, &dataout) return 0, nil } func httpReq(client *http.Client, name, url string, req interface{}) ([]byte, error) { s, err := json.Marshal(req) if err != nil { return nil, err } resp, err := client.Post(url+name, "text/json", bytes.NewBuffer(s)) if err != nil { return nil, err } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { return nil, err } //log.Println(string(body)) return body, nil } func streamReq(client *http.Client, name, url string, req interface{}) (io.ReadCloser, error) { s, err := json.Marshal(req) if err != nil { return nil, err } resp, err := client.Post(url+name, "text/json", bytes.NewBuffer(s)) if err != nil { return nil, err } return resp.Body, err } func (gds *GeneralDS) getInsIdStr(intInsId int64) string { insIdStr, ok := gds.idMappingMap[intInsId] if ok { return insIdStr } else { return "" } } func (gds *GeneralDS) addIns(insId int64) *market.Instrument { insIdStr, ok := gds.idMappingMap[insId] if ok { ins, ok1 := gds.insMap[insIdStr] if !ok1 { //log.Println("addIns wrongggggggggg idmapping", gds.typ, insId, insIdStr) } return ins } else { tmpIns, err := gds.getIns(insId) if err != nil { //log.Println("addIns wrongggggggggg ins", gds.typ, insId, err) return nil } if tmpIns.Id != insId { //log.Println("addIns wrongggggggggg insId", gds.typ, insId, tmpIns.Id) return nil } ins := tkIns2mkIns(gds.typ, tmpIns) //if tk.Type == IntSina { //log.Println("sssss", insId, tmpIns, *ins) //} gds.insMap[ins.Id] = ins gds.idMappingMap[tmpIns.Id] = ins.Id gds.insPublisher.Publish(ins) return ins } return nil } func tMk2mMk(tick Market, ins *market.Instrument) market.Market { mk := market.Market{ InsId: ins.Id, Timestamp: tick.Timestamp, Close: tick.Close, Open: tick.Open, High: tick.High, Low: tick.Low, AllVolume: tick.AllVolume, AllAmount: tick.AllAmount, LastPrice: tick.LastPrice, Volume: tick.LastVolume, } mk.Bids = make([]market.PP, len(tick.Bids)) for i, v := range tick.Bids { mk.Bids[i][0] = v[0] mk.Bids[i][1] = v[1] } mk.Asks = make([]market.PP, len(tick.Asks)) for i, v := range tick.Asks { mk.Asks[i][0] = v[0] mk.Asks[i][1] = v[1] } mk.SetIns(ins) return mk } const ( jm = iota TC RM SR bu a ru i al y b zn fb j ag PM TA IF c pb l TF hc SF m fu wr CF RI v JR SM cu rb bb pp FG RS WH au jd p MA LR IH IC ) var ctpTypMap map[string]int var ctpTyps = map[int]string{ jm: "JM", TC: "TC", RM: "RM", SR: "SR", bu: "BU", a: "A", ru: "RU", i: "I", al: "AL", y: "Y", b: "B", zn: "ZN", fb: "FB", j: "J", ag: "AG", PM: "PM", TA: "TA", IF: "IF", c: "C", pb: "PB", l: "L", TF: "TF", hc: "HC", SF: "SF", m: "M", fu: "FU", wr: "WR", CF: "CF", RI: "RI", v: "V", JR: "JR", SM: "SM", cu: "CU", rb: "RB", bb: "BB", pp: "PP", FG: "FG", RS: "RS", WH: "WH", au: "AU", jd: "JD", p: "P", MA: "MA", LR: "LR", IH: "IH", IC: "IC", } type tPoint struct { hour, minute int } type tInterval struct { st, et tPoint } var shefTi = []tInterval{ {tPoint{9, 0}, tPoint{10, 15}}, {tPoint{10, 30}, tPoint{11, 30}}, {tPoint{13, 30}, tPoint{15, 0}}, {tPoint{21, 0}, tPoint{23, 59}}, {tPoint{0, 0}, tPoint{2, 30}}, } var decAndCzceTi = []tInterval{ {tPoint{9, 0}, tPoint{10, 15}}, {tPoint{10, 30}, tPoint{11, 30}}, {tPoint{13, 30}, tPoint{15, 0}}, } var cffexTi = []tInterval{ {tPoint{9, 15}, tPoint{11, 30}}, {tPoint{13, 0}, tPoint{15, 15}}, } /*func (gds *GeneralDS) download() { var offset int num := 1000 start := gds.st for num >= 1000 { req := &DownloadRequest{Type: gds.typ, Start: start, End: 0, Offset: offset, Count: 1000, OrderBy: "time asc"} log.Println("download", req) body, err := httpReq(gds.client, "download", gds.url, req) if err != nil { log.Println("httpReq", err) return } var ticks []*Market _, err = decodeResp(body, &ticks) if err != nil { log.Println("decodeResp", err) return } log.Println("download num:", len(ticks)) for _, v := range ticks { if v.Type != int32(gds.typId) { log.Println("download wrongggggggggg typ", v.Type, gds.typId) continue } gds.st = v.Timestamp ins := gds.addIns(*v) if nil == ins { continue } mk := tMk2mMk(*v, ins) //ins.SetMk(&mk) gds.Save(&mk) //printDelay(DataTypeName(int(v.Type)), "eeeee"+fmt.Sprint(v.InsId), v.Timestamp) } //log.Println(ticks) num = len(ticks) offset += num } }*/ /*func (gds *GeneralDS) download() { var offset int var lastSt, lastTime int64 bSorted := true num := 1000 for num >= 1000 { if bSorted { if gds.st == lastSt { offset += num } else { offset = 0 } } else { gds.st = lastSt offset += num } req := &DownloadRequest{Type: gds.typ, Start: gds.st, End: 0, Offset: offset, Count: 1000, OrderBy: "time asc"} log.Println("download", req) lastSt = gds.st body, err := httpReq(gds.client, "download", gds.url, req) if err != nil { log.Println("httpReq", err) return } var ticks []*Market _, err = decodeResp(body, &ticks) if err != nil { log.Println("decodeResp", err) return } log.Println("download num:", len(ticks)) for _, v := range ticks { gds.st = v.Timestamp if gds.st < lastTime { bSorted = false log.Println("not sorted") } lastTime = gds.st ins := gds.addIns(*v) mk := tMk2mMk(*v, ins) ins.SetMk(&mk) gds.Save(&mk) //printDelay(DataTypeName(int(v.Type)), "eeeee"+fmt.Sprint(v.InsId), v.Timestamp) } //log.Println(ticks) num = len(ticks) //offset += num } }*/