|
- // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved.
- package tick
- // 本文件实现general数据源接口, 实时数据和历史数据的获取和保存
- import (
- "bytes"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "io/ioutil"
- //"log"
- "net/http"
- "path"
- "strconv"
- "strings"
- "time"
- "tickserver/framework/event"
- "tickserver/markinfo"
- "tickserver/server/market"
- "github.com/niniwzw/http2"
- )
- var tr = &http2.Transport{
- InsecureTLSDial: true,
- Timeout: 2000 * time.Millisecond,
- }
- // GeneralDS实现了dataSource接口, 并对general的历史数据和实时数据保存
- type GeneralDS struct {
- *market.DSBase
- cm *CandleMaker
- client *http.Client
- insPublisher event.EventPublisher
- insMap map[string]*market.Instrument
- idMappingMap map[int64]string
- typ string
- typId int
- url string
- st int64
- curId int64
- }
- type JsonResp2 struct {
- Result *json.RawMessage `json:"result"`
- Err string `json:"err"`
- Code int `json:"code"`
- }
- func NewGeneralDS(typ, url, fileserver, dataDir string, db *market.MyDB, bSSL bool) (*GeneralDS, error) {
- dir := dataDir + "/" + typ
- //now := time.Now()
- //st := time.Date(now.Year(), now.Month(), now.Day(), 6, 0, 0, 0, time.Local).Unix() * 1000
- typId := TypeId(typ)
- var client *http.Client
- if bSSL {
- client = &http.Client{Transport: tr}
- } else {
- client = &http.Client{}
- }
- gds := &GeneralDS{
- DSBase: market.NewDsBase(db, dir),
- insMap: make(map[string]*market.Instrument),
- idMappingMap: make(map[int64]string),
- typ: typ,
- typId: typId,
- url: url,
- //st: st,
- client: client,
- cm: &CandleMaker{
- typ: typ,
- typId: typId,
- url: url,
- dataDir: dataDir,
- fileserver: fileserver,
- db: db,
- client: client,
- },
- }
- gds.cm.gds = gds
- return gds, nil
- }
- func (gds *GeneralDS) SubIns() *event.Event {
- return gds.insPublisher.Event()
- }
- func (gds *GeneralDS) Run() {
- //log.Println("GeneralDS.Run, type:", gds.typ)
- /*goroutineNum := 64
- switch gds.typId {
- case IntLmax:
- goroutineNum = 8
- case IntEasyForex:
- goroutineNum = 4
- case IntCtp:
- goroutineNum = 32
- case IntDzh:
- goroutineNum = 64
- case IntTdx:
- goroutineNum = 512
- }*/
- go gds.DoReadEx()
- go gds.cm.run()
- //log.Println("getInss begin", gds.typ)
- gds.getInss()
- //log.Println("getInss end", gds.typ)
- //log.Println("load begin", gds.typ)
- gds.load()
- //log.Println("load end", gds.typ)
- //gds.download()
- //log.Println("stream begin", gds.typ)
- gds.stream()
- //log.Println("stream end", gds.typ)
- }
- func (gds *GeneralDS) load() {
- year := fmt.Sprintf("%d", time.Now().Year())
- for i, v := range gds.insMap {
- dir := path.Join(gds.cm.dataDir, gds.typ, i, year)
- //log.Println(dir)
- infos, err := ioutil.ReadDir(dir)
- if err != nil {
- //log.Println(err)
- continue
- }
- var curname string
- for j := 0; j < len(infos); j++ {
- if strings.HasSuffix(infos[j].Name(), ".tk.gz") {
- curname = infos[j].Name()
- }
- }
- tickfile := path.Join(dir, curname)
- //log.Println(tickfile)
- ticks, _ := market.ReadTickFile(tickfile)
- for _, tick := range ticks {
- var mk market.Market
- mk.InsId = v.Id
- mk.Timestamp = tick.Timestamp
- mk.Close = tick.Price
- mk.Low = tick.Price
- mk.High = tick.Price
- mk.Open = tick.Price
- mk.AllAmount = tick.Volume
- mk.AllVolume = tick.Volume
- mk.LastPrice = tick.Price
- mk.Volume = tick.Volume
- mk.Bids = make([]market.PP, 1)
- mk.Bids = append(mk.Bids, tick.Bid)
- mk.Asks = make([]market.PP, 1)
- mk.Asks = append(mk.Asks, tick.Ask)
- //time.Sleep(time.Millisecond)
- gds.SaveL(&mk)
- }
- }
- }
- func (gds *GeneralDS) getIns(id int64) (Instrument, error) {
- var ins Instrument
- req := &InstrumentRequest{Type: gds.typ, Id: id}
- body, err := httpReq(gds.client, "instrument", gds.url, req)
- if err != nil {
- //log.Println("httpreq", err)
- return ins, err
- }
- _, err = decodeResp(body, &ins)
- if err != nil {
- //log.Println("decodeResponse", err)
- return ins, err
- }
- return ins, nil
- }
- func (gds *GeneralDS) getInss() {
- for {
- req := &InstrumentsRequest{Type: gds.typ}
- body, err := httpReq(gds.client, "instruments", gds.url, req)
- if err != nil {
- //log.Println("httpreq", err)
- time.Sleep(10 * time.Second)
- continue
- }
- var inss []Instrument
- _, err = decodeResp(body, &inss)
- if err != nil {
- //log.Println("decodeResponse", err)
- return
- }
- for _, v := range inss {
- ins := tkIns2mkIns(gds.typ, v)
- //if tk.Type == IntSina {
- //log.Println("sssss", *ins)
- //}
- gds.idMappingMap[v.Id] = ins.Id
- gds.insMap[ins.Id] = ins
- gds.insPublisher.Publish(ins)
- }
- break
- }
- }
- func tkIns2mkIns(typ string, tIns Instrument) *market.Instrument {
- var insId string
- if typ == DataTypeName(IntCtp) {
- insId = typ + "_"
- intTyp := tIns.Id / 10000
- insTyp, _ := ctpTyps[int(intTyp)]
- insId += insTyp
- intSuffix := tIns.Id % 10000
- insId += strconv.Itoa(int(intSuffix))
- } else if typ == DataTypeName(IntEasyForex) || typ == DataTypeName(IntOanda) || typ == DataTypeName(IntBtc) || typ == DataTypeName(IntPolo) || typ == DataTypeName(IntBty) || typ == DataTypeName(IntCFix) || typ == DataTypeName(IntHuobi) || typ == DataTypeName(IntYunbi) || typ == DataTypeName(IntChbtc) {
- idStr, _ := markinfo.SymbolName(int(tIns.Id))
- insId = typ + "_" + idStr
- } else {
- if typ == Sina || typ == Dzh || typ == Tdx {
- insId = typ + "_" + fmt.Sprintf("%06d", tIns.Id)
- } else {
- insId = typ + "_" + fmt.Sprintf("%d", tIns.Id)
- }
- }
- ins := &market.Instrument{
- Id: insId,
- Name: tIns.Name,
- Typ: tIns.Type,
- ExId: tIns.ExId,
- PriceInc: tIns.PriceInc,
- Margin: tIns.Margin,
- StartTime: tIns.StartTime,
- EndTime: tIns.EndTime,
- }
- return ins
- }
- func (gds *GeneralDS) download() {
- var offset int
- num := 1000
- downstart := gds.st
- downid := gds.curId
- for num >= 1000 {
- req := &DownloadRequest{Type: gds.typ, Start: downstart, End: downid, Offset: offset, Count: 1000, OrderBy: "time asc"}
- //log.Println("download", req)
- body, err := httpReq(gds.client, "download", gds.url, req)
- if err != nil {
- //log.Println("httpReq", err)
- return
- }
- var ticks []*Market
- _, err = decodeResp(body, &ticks)
- if err != nil {
- //log.Println("decodeResp", err)
- return
- }
- //log.Println("download num:", len(ticks))
- for _, v := range ticks {
- if v.Type != int32(gds.typId) {
- //log.Println("download wrongggggggggg typ", v.Type, gds.typId)
- continue
- }
- //gds.st = v.Timestamp
- //gds.curId = v.InsId
- ins := gds.addIns(v.InsId)
- if nil == ins {
- continue
- }
- mk := tMk2mMk(*v, ins)
- //ins.SetMk(&mk)
- gds.Save(&mk)
- //printDelay(DataTypeName(int(v.Type)), "eeeee"+fmt.Sprint(v.InsId), v.Timestamp)
- }
- //log.Println(ticks)
- num = len(ticks)
- offset += num
- }
- }
- func (gds *GeneralDS) stream() {
- for {
- req := &StreamRequest{Type: gds.typ}
- //log.Println("streamReq begin", gds.typ)
- body, err := streamReq(gds.client, "stream", gds.url, req)
- if err != nil {
- //log.Println("streamReq", err)
- time.Sleep(10 * time.Second)
- continue
- }
- //log.Println("streamReq end", gds.typ)
- decoder := json.NewDecoder(body)
- var tick Market
- for {
- //fmt.Printf("[O]")
- err = decoder.Decode(&tick)
- //fmt.Printf("[C]")
- if err != nil {
- body.Close()
- //log.Println("connect retry...", err)
- time.Sleep(time.Second * 1)
- break
- }
- if tick.Type != int32(gds.typId) {
- //log.Println("stream wrongggggggggg typ", tick.Type, gds.typId)
- continue
- }
- gds.st = tick.Timestamp
- gds.curId = tick.InsId
- ins := gds.addIns(tick.InsId)
- if nil == ins {
- continue
- }
- //if gds.typId == IntTdx && tick.InsId == 1 {
- //log.Println("[stream]data trace")
- //}
- mk := tMk2mMk(tick, ins)
- ins.SetMk(&mk)
- gds.Save(&mk)
- }
- }
- }
- func decodeResp(datain []byte, dataout interface{}) (int, error) {
- var resp JsonResp2
- err := json.Unmarshal(datain, &resp)
- if err != nil {
- return -1, err
- }
- if resp.Err != "" {
- return -1, errors.New(resp.Err)
- }
- if resp.Result == nil {
- return 0, nil
- }
- json.Unmarshal(*resp.Result, &dataout)
- return 0, nil
- }
- func httpReq(client *http.Client, name, url string, req interface{}) ([]byte, error) {
- s, err := json.Marshal(req)
- if err != nil {
- return nil, err
- }
- resp, err := client.Post(url+name, "text/json", bytes.NewBuffer(s))
- if err != nil {
- return nil, err
- }
- defer resp.Body.Close()
- body, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- return nil, err
- }
- //log.Println(string(body))
- return body, nil
- }
- func streamReq(client *http.Client, name, url string, req interface{}) (io.ReadCloser, error) {
- s, err := json.Marshal(req)
- if err != nil {
- return nil, err
- }
- resp, err := client.Post(url+name, "text/json", bytes.NewBuffer(s))
- if err != nil {
- return nil, err
- }
- return resp.Body, err
- }
- func (gds *GeneralDS) getInsIdStr(intInsId int64) string {
- insIdStr, ok := gds.idMappingMap[intInsId]
- if ok {
- return insIdStr
- } else {
- return ""
- }
- }
- func (gds *GeneralDS) addIns(insId int64) *market.Instrument {
- insIdStr, ok := gds.idMappingMap[insId]
- if ok {
- ins, ok1 := gds.insMap[insIdStr]
- if !ok1 {
- //log.Println("addIns wrongggggggggg idmapping", gds.typ, insId, insIdStr)
- }
- return ins
- } else {
- tmpIns, err := gds.getIns(insId)
- if err != nil {
- //log.Println("addIns wrongggggggggg ins", gds.typ, insId, err)
- return nil
- }
- if tmpIns.Id != insId {
- //log.Println("addIns wrongggggggggg insId", gds.typ, insId, tmpIns.Id)
- return nil
- }
- ins := tkIns2mkIns(gds.typ, tmpIns)
- //if tk.Type == IntSina {
- //log.Println("sssss", insId, tmpIns, *ins)
- //}
- gds.insMap[ins.Id] = ins
- gds.idMappingMap[tmpIns.Id] = ins.Id
- gds.insPublisher.Publish(ins)
- return ins
- }
- return nil
- }
- func tMk2mMk(tick Market, ins *market.Instrument) market.Market {
- mk := market.Market{
- InsId: ins.Id,
- Timestamp: tick.Timestamp,
- Close: tick.Close,
- Open: tick.Open,
- High: tick.High,
- Low: tick.Low,
- AllVolume: tick.AllVolume,
- AllAmount: tick.AllAmount,
- LastPrice: tick.LastPrice,
- Volume: tick.LastVolume,
- }
- mk.Bids = make([]market.PP, len(tick.Bids))
- for i, v := range tick.Bids {
- mk.Bids[i][0] = v[0]
- mk.Bids[i][1] = v[1]
- }
- mk.Asks = make([]market.PP, len(tick.Asks))
- for i, v := range tick.Asks {
- mk.Asks[i][0] = v[0]
- mk.Asks[i][1] = v[1]
- }
- mk.SetIns(ins)
- return mk
- }
- const (
- jm = iota
- TC
- RM
- SR
- bu
- a
- ru
- i
- al
- y
- b
- zn
- fb
- j
- ag
- PM
- TA
- IF
- c
- pb
- l
- TF
- hc
- SF
- m
- fu
- wr
- CF
- RI
- v
- JR
- SM
- cu
- rb
- bb
- pp
- FG
- RS
- WH
- au
- jd
- p
- MA
- LR
- IH
- IC
- )
- var ctpTypMap map[string]int
- var ctpTyps = map[int]string{
- jm: "JM",
- TC: "TC",
- RM: "RM",
- SR: "SR",
- bu: "BU",
- a: "A",
- ru: "RU",
- i: "I",
- al: "AL",
- y: "Y",
- b: "B",
- zn: "ZN",
- fb: "FB",
- j: "J",
- ag: "AG",
- PM: "PM",
- TA: "TA",
- IF: "IF",
- c: "C",
- pb: "PB",
- l: "L",
- TF: "TF",
- hc: "HC",
- SF: "SF",
- m: "M",
- fu: "FU",
- wr: "WR",
- CF: "CF",
- RI: "RI",
- v: "V",
- JR: "JR",
- SM: "SM",
- cu: "CU",
- rb: "RB",
- bb: "BB",
- pp: "PP",
- FG: "FG",
- RS: "RS",
- WH: "WH",
- au: "AU",
- jd: "JD",
- p: "P",
- MA: "MA",
- LR: "LR",
- IH: "IH",
- IC: "IC",
- }
- type tPoint struct {
- hour, minute int
- }
- type tInterval struct {
- st, et tPoint
- }
- var shefTi = []tInterval{
- {tPoint{9, 0}, tPoint{10, 15}},
- {tPoint{10, 30}, tPoint{11, 30}},
- {tPoint{13, 30}, tPoint{15, 0}},
- {tPoint{21, 0}, tPoint{23, 59}},
- {tPoint{0, 0}, tPoint{2, 30}},
- }
- var decAndCzceTi = []tInterval{
- {tPoint{9, 0}, tPoint{10, 15}},
- {tPoint{10, 30}, tPoint{11, 30}},
- {tPoint{13, 30}, tPoint{15, 0}},
- }
- var cffexTi = []tInterval{
- {tPoint{9, 15}, tPoint{11, 30}},
- {tPoint{13, 0}, tPoint{15, 15}},
- }
- /*func (gds *GeneralDS) download() {
- var offset int
- num := 1000
- start := gds.st
- for num >= 1000 {
- req := &DownloadRequest{Type: gds.typ, Start: start, End: 0, Offset: offset, Count: 1000, OrderBy: "time asc"}
- log.Println("download", req)
- body, err := httpReq(gds.client, "download", gds.url, req)
- if err != nil {
- log.Println("httpReq", err)
- return
- }
- var ticks []*Market
- _, err = decodeResp(body, &ticks)
- if err != nil {
- log.Println("decodeResp", err)
- return
- }
- log.Println("download num:", len(ticks))
- for _, v := range ticks {
- if v.Type != int32(gds.typId) {
- log.Println("download wrongggggggggg typ", v.Type, gds.typId)
- continue
- }
- gds.st = v.Timestamp
- ins := gds.addIns(*v)
- if nil == ins {
- continue
- }
- mk := tMk2mMk(*v, ins)
- //ins.SetMk(&mk)
- gds.Save(&mk)
- //printDelay(DataTypeName(int(v.Type)), "eeeee"+fmt.Sprint(v.InsId), v.Timestamp)
- }
- //log.Println(ticks)
- num = len(ticks)
- offset += num
- }
- }*/
- /*func (gds *GeneralDS) download() {
- var offset int
- var lastSt, lastTime int64
- bSorted := true
- num := 1000
- for num >= 1000 {
- if bSorted {
- if gds.st == lastSt {
- offset += num
- } else {
- offset = 0
- }
- } else {
- gds.st = lastSt
- offset += num
- }
- req := &DownloadRequest{Type: gds.typ, Start: gds.st, End: 0, Offset: offset, Count: 1000, OrderBy: "time asc"}
- log.Println("download", req)
- lastSt = gds.st
- body, err := httpReq(gds.client, "download", gds.url, req)
- if err != nil {
- log.Println("httpReq", err)
- return
- }
- var ticks []*Market
- _, err = decodeResp(body, &ticks)
- if err != nil {
- log.Println("decodeResp", err)
- return
- }
- log.Println("download num:", len(ticks))
- for _, v := range ticks {
- gds.st = v.Timestamp
- if gds.st < lastTime {
- bSorted = false
- log.Println("not sorted")
- }
- lastTime = gds.st
- ins := gds.addIns(*v)
- mk := tMk2mMk(*v, ins)
- ins.SetMk(&mk)
- gds.Save(&mk)
- //printDelay(DataTypeName(int(v.Type)), "eeeee"+fmt.Sprint(v.InsId), v.Timestamp)
- }
- //log.Println(ticks)
- num = len(ticks)
- //offset += num
- }
- }*/
|