// Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved. package market // 本文件实现fzm相关的操作: 获取历史数据和动态行情数据以及产品列表 import ( "encoding/json" "errors" "io/ioutil" "log" "os" "sync" "tickserver/framework/event" ) const fzmInsTable = "Fzm_inss" const ( BufMax = 2048 ) // FzmFx定义了交易所 // 集成了所有的数据源: lmax, easyforex, oanda, macoin等 // 使用数据库管理下载的历史数据 // 并在内存中缓存了最新的tick数据和candleK线数据 // 实现了Server服务的接口 type FzmEx struct { db *MyDB // 数据库 dsMap map[string]DataSource // 数据源map imu sync.Mutex // 保护insMap insMap map[string]*Instrument // 保存所有的instrument inssFname string // instruments 列表文件 } func makeTBMap() map[int]map[string]*TickBuffer { tbMap := make(map[int]map[string]*TickBuffer) for k, _ := range basePeriodSet { tbMap[k] = make(map[string]*TickBuffer) } return tbMap } func NewFzmEx(db *MyDB, inssFname string) *FzmEx { insMap, err := readInssTable(inssFname) if err != nil { //log.Println("NewFzmEx warning:", err) insMap = make(map[string]*Instrument) } //log.Println("NewFzmEx:", insMap) return &FzmEx{ dsMap: make(map[string]DataSource), insMap: insMap, db: db, inssFname: inssFname, } } func (ex *FzmEx) GetIns(insId string) *Instrument { ex.imu.Lock() defer ex.imu.Unlock() return ex.insMap[insId] } func (ex *FzmEx) setIns(ins *Instrument) { ex.imu.Lock() delete(ex.insMap, ins.Id) ex.insMap[ins.Id] = ins ex.imu.Unlock() // f := ex.onMarket() // ins.OnMarket().Attach(f) } func (ex *FzmEx) GetCacheCandles(insId string, period int) ([]Candle, error) { ds, ok := ex.dsMap[InsIdPrefix(insId)] if !ok { return nil, errors.New("GetCacheCandles error: insId is NOT in insMap: " + insId) } return ds.GetCacheCandles(insId, period) } func (ex *FzmEx) GetCacheTicks(insId string) ([]Tick, error) { ds, ok := ex.dsMap[InsIdPrefix(insId)] if !ok { return nil, errors.New("GetLastTicks error: insId is NOT in insMap: " + insId) } return ds.GetCacheTicks(insId) } func (ex *FzmEx) GetTimeList(insId, period, beginStr string) ([]string, error) { ds, ok := ex.dsMap[InsIdPrefix(insId)] if !ok { return nil, errors.New("GetLastTicks error: insId is NOT in insMap: " + insId) } return ds.GetTimeList(insId, period, beginStr) } func (ex *FzmEx) SaveAllTicks() { for _, ds := range ex.dsMap { ds.SaveAllTicks() } } func (ex *FzmEx) AddDS(name string, ds DataSource) { ex.dsMap[name] = ds ds.SubIns().Attach(func(v interface{}) error { ins, ok := v.(*Instrument) if !ok { log.Fatal("FzmEx.AddDS error. v is NOT *Instrument") } ex.setIns(ins) return nil }) } func readInssTable(inssFname string) (map[string]*Instrument, error) { f, err := os.Open(inssFname) if err != nil { return nil, err } defer f.Close() dec := json.NewDecoder(f) insMap := make(map[string]*Instrument) err = dec.Decode(&insMap) if err != nil { return nil, err } return insMap, nil } func (ex *FzmEx) writeInssTable() error { b, err := json.MarshalIndent(ex.insMap, "", " ") if err != nil { return err } ioutil.WriteFile(ex.inssFname, b, os.ModePerm) return nil } func (ex *FzmEx) Instruments() map[string]*Instrument { ex.writeInssTable() return ex.insMap } func (ex *FzmEx) SubMarket(insId string) *event.Event { ex.imu.Lock() ins, ok := ex.insMap[insId] ex.imu.Unlock() if !ok { return nil } //log.Println("FzmEx.SubMarket:", insId) return ins.mkPublisher.Event() }