123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved.
- package market
- // 本文件实现fzm相关的操作: 获取历史数据和动态行情数据以及产品列表
- import (
- "encoding/json"
- "errors"
- "io/ioutil"
- "log"
- "os"
- "sync"
- "tickserver/framework/event"
- )
- const fzmInsTable = "Fzm_inss"
- const (
- BufMax = 2048
- )
- // FzmFx定义了交易所
- // 集成了所有的数据源: lmax, easyforex, oanda, macoin等
- // 使用数据库管理下载的历史数据
- // 并在内存中缓存了最新的tick数据和candleK线数据
- // 实现了Server服务的接口
- type FzmEx struct {
- db *MyDB // 数据库
- dsMap map[string]DataSource // 数据源map
- imu sync.Mutex // 保护insMap
- insMap map[string]*Instrument // 保存所有的instrument
- inssFname string // instruments 列表文件
- }
- func makeTBMap() map[int]map[string]*TickBuffer {
- tbMap := make(map[int]map[string]*TickBuffer)
- for k, _ := range basePeriodSet {
- tbMap[k] = make(map[string]*TickBuffer)
- }
- return tbMap
- }
- func NewFzmEx(db *MyDB, inssFname string) *FzmEx {
- insMap, err := readInssTable(inssFname)
- if err != nil {
- //log.Println("NewFzmEx warning:", err)
- insMap = make(map[string]*Instrument)
- }
- //log.Println("NewFzmEx:", insMap)
- return &FzmEx{
- dsMap: make(map[string]DataSource),
- insMap: insMap,
- db: db,
- inssFname: inssFname,
- }
- }
- func (ex *FzmEx) GetIns(insId string) *Instrument {
- ex.imu.Lock()
- defer ex.imu.Unlock()
- return ex.insMap[insId]
- }
- func (ex *FzmEx) setIns(ins *Instrument) {
- ex.imu.Lock()
- delete(ex.insMap, ins.Id)
- ex.insMap[ins.Id] = ins
- ex.imu.Unlock()
- // f := ex.onMarket()
- // ins.OnMarket().Attach(f)
- }
- func (ex *FzmEx) GetCacheCandles(insId string, period int) ([]Candle, error) {
- ds, ok := ex.dsMap[InsIdPrefix(insId)]
- if !ok {
- return nil, errors.New("GetCacheCandles error: insId is NOT in insMap: " + insId)
- }
- return ds.GetCacheCandles(insId, period)
- }
- func (ex *FzmEx) GetCacheTicks(insId string) ([]Tick, error) {
- ds, ok := ex.dsMap[InsIdPrefix(insId)]
- if !ok {
- return nil, errors.New("GetLastTicks error: insId is NOT in insMap: " + insId)
- }
- return ds.GetCacheTicks(insId)
- }
- func (ex *FzmEx) GetTimeList(insId, period, beginStr string) ([]string, error) {
- ds, ok := ex.dsMap[InsIdPrefix(insId)]
- if !ok {
- return nil, errors.New("GetLastTicks error: insId is NOT in insMap: " + insId)
- }
- return ds.GetTimeList(insId, period, beginStr)
- }
- func (ex *FzmEx) SaveAllTicks() {
- for _, ds := range ex.dsMap {
- ds.SaveAllTicks()
- }
- }
- func (ex *FzmEx) AddDS(name string, ds DataSource) {
- ex.dsMap[name] = ds
- ds.SubIns().Attach(func(v interface{}) error {
- ins, ok := v.(*Instrument)
- if !ok {
- log.Fatal("FzmEx.AddDS error. v is NOT *Instrument")
- }
- ex.setIns(ins)
- return nil
- })
- }
- func readInssTable(inssFname string) (map[string]*Instrument, error) {
- f, err := os.Open(inssFname)
- if err != nil {
- return nil, err
- }
- defer f.Close()
- dec := json.NewDecoder(f)
- insMap := make(map[string]*Instrument)
- err = dec.Decode(&insMap)
- if err != nil {
- return nil, err
- }
- return insMap, nil
- }
- func (ex *FzmEx) writeInssTable() error {
- b, err := json.MarshalIndent(ex.insMap, "", " ")
- if err != nil {
- return err
- }
- ioutil.WriteFile(ex.inssFname, b, os.ModePerm)
- return nil
- }
- func (ex *FzmEx) Instruments() map[string]*Instrument {
- ex.writeInssTable()
- return ex.insMap
- }
- func (ex *FzmEx) SubMarket(insId string) *event.Event {
- ex.imu.Lock()
- ins, ok := ex.insMap[insId]
- ex.imu.Unlock()
- if !ok {
- return nil
- }
- //log.Println("FzmEx.SubMarket:", insId)
- return ins.mkPublisher.Event()
- }
|