// +build linux // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved. package tick // 本文件实现盛宝saxo数据源接口, 实时数据和历史数据的获取和保存 import ( "encoding/csv" "errors" "log" "os" "strconv" "strings" "time" "tickserver/api/saxocgo" "tickserver/markinfo" "tickserver/server/market" ) type InsInfo struct { Id string Name string ExId string PriceInc float64 Margin float64 StartTime string } // SaxoFixDS实现了dataSource接口, 并对fix的历史数据和实时数据保存 type SaxoFixDS struct { *DSBase conf *DsConf symbolMap map[string]string //insMap map[int64]*market.Instrument } func init() { drivers[Saxo] = newSaxoDS } func newSaxoDS(conf *DsConf) (DataSource, error) { insMap, symbolMap, err := parseCSV(conf.SymbolsFile) if err != nil { return nil, err } sds := &SaxoFixDS{ DSBase: NewDsBase(conf), conf: conf, symbolMap: symbolMap, //insMap: insMap, } sds.insMap = insMap return sds, nil } func parseCSV(name string) (map[int64]*Instrument, map[string]string, error) { f, err := os.Open(name) if err != nil { return nil, nil, err } r := csv.NewReader(f) insMap := make(map[int64]*Instrument) symbolMap := make(map[string]string) first := true for { ss, err := r.Read() if err != nil { break } if len(ss) != 1 { continue } if first { first = false continue } s := strings.Trim(ss[0], " ") symbol := strings.Replace(s, "/", "", 1) // EUR/USD ==> EURUSD id, err := markinfo.SymbolId(symbol) if err != nil { log.Println(err) continue } unit, err := markinfo.SymbolUint(symbol) if err != nil { log.Println(err) continue } symbolMap[strconv.Itoa(id)] = s ins := &Instrument{ Id: int64(id), //market.SaxoPrefix + symbol, Name: s, ExId: market.Saxo, PriceInc: unit, Type: market.Forex, StartTime: time.Date(2014, 12, 31, 0, 0, 0, 0, time.Local).Unix() * 1000, } id64 := int64(id) insMap[id64] = ins } return insMap, symbolMap, nil } //func (fds *SaxoFixDS) SubIns() *event.Event { //return fds.insPublisher.Event() //} func (fds *SaxoFixDS) Name() string { return Saxo } func (fds *SaxoFixDS) Run() { log.Println("SaxoFixDS.Run") //for _, ins := range fds.insMap { //log.Println("SaxoFixDS:", ins.Id, ins.Name) //fds.insPublisher.Publish(ins) //} //go fds.RunSave(16) fixApp := saxocgo.NewApp(fds.symbolMap, fds.conf.User, fds.conf.PassWord) cfgFile := fds.conf.CfgFile go fixApp.Run(cfgFile) for fixTick := range fixApp.Ch { m, err := fds.convMarket(fixTick) if err != nil { continue } ins, ok := fds.insMap[m.InsId] if !ok { log.Fatal("InsId is NOT in fds.insMap", m.InsId) } if m.Timestamp < ins.StartTime { log.Println("error: m.Timestamp < ins.StartTime:", m.Timestamp, ins.StartTime) continue } //ins.SetMk(m) fds.Save(m) } } func (fds *SaxoFixDS) convMarket(tick *saxocgo.FixTick) (*Market, error) { if tick.AskCount == 0 && tick.BidCount == 0 { return nil, errors.New("tick.AskCount==0 && tick.BidCount==0") } id := saxocgo.Symbol(tick.Symbol) symbol, ok := fds.symbolMap[id] if !ok { return nil, errors.New("tick.Symbol NOT in symbolMap: " + id) } symbol = strings.Replace(symbol, "/", "", 1) // EUR/USD ==> EURUSD //insId := market.SaxoPrefix + symbol insId, _ := markinfo.SymbolId(symbol) insId64 := int64(insId) _, ok = fds.insMap[insId64] if !ok { log.Fatal("InsId is NOT in fds.insMap", insId) } //mk := ins.GetMk() mk := &Market{} mk.InsId = insId64 mk.Type = IntSaxo for len(mk.Asks) < int(tick.AskCount) { mk.Asks = append(mk.Asks, PP{}) } for len(mk.Bids) < int(tick.BidCount) { mk.Bids = append(mk.Bids, PP{}) } for i := 0; i < int(tick.AskCount); i++ { if tick.AskPrice[i] == 0 { mk.Asks[i][0] = mk.Asks[i][0] } else { mk.Asks[i][0] = tick.AskPrice[i] } mk.Asks[i][1] = float64(tick.AskVolume[i]) } for i := 0; i < int(tick.BidCount); i++ { if tick.BidPrice[i] == 0 { mk.Bids[i][0] = mk.Bids[i][0] } else { mk.Bids[i][0] = tick.BidPrice[i] } mk.Bids[i][1] = float64(tick.BidVolume[i]) } lastPrice := 0. vol := 0. if len(mk.Bids) > 0 { lastPrice = mk.Bids[0][0] vol = mk.Bids[0][1] } if lastPrice == 0. { if len(mk.Asks) > 0 { lastPrice = mk.Asks[0][0] vol = mk.Asks[0][1] } } mk.LastPrice = lastPrice mk.LastVolume = vol mk.Timestamp = int64(tick.Time)*1000 + int64(tick.Millisecond) return mk, nil }