// Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved. package market // 本文件包含数据结构的定义以及通用函数的实现 import ( "bufio" "bytes" "compress/gzip" "encoding/binary" "errors" "fmt" "io" "log" "math" "os" "path" "sort" "strings" "sync" "time" "tickserver/framework/event" ) func DebugDelay(prefix, insId string, ts int64) { // for debug delay now := time.Now() d := int64(float64(now.UnixNano())*1e-6) - ts if d > 1000 { //log.Println(prefix, "delay > 1000ms", insId, d, getTime(ts)) } } const ( Lmax = "lmax" Oanda = "oanda" EasyForex = "easyforex" Ctp = "ctp" Fix = "fix" Dzh = "dzh" Saxo = "saxo" Btc = "btc" Polo = "polo" Bty = "bty" CFix = "cfix" Huobi = "huobi" Yunbi = "yunbi" Chbtc = "chbtc" General = "general" ) const ( LmaxPrefix = "lmax_" OandaPrefix = "oanda_" EasyForexPrefix = "easyforex_" CtpPrefix = "ctp_" FixPrefix = "fix_" DzhPrefix = "dzh_" SaxoPrefix = "saxo_" BtcPrefix = "btc_" PoloPrefix = "polo_" BtyPrefix = "bty_" CFixPrefix = "cfix_" HuobiPrefix = "huobi_" YunbiPrefix = "yunbi_" ChbtcPrefix = "chbtc_" ) // 期货交易所 const ( SHFE = "SHFE" // 上海期货交易所 CFFEX = "CFFEX" // 中国金融交易所 DEC = "DEC" // 大连商品交易所 CZCE = "CZCE" // 郑州商品交易所 ) // 证券交易所 const ( SHEX = "SH" // 上海证券交易所 SZEX = "SZ" // 深证证券交易所 ) // 上交所指数 const ( SHIND1 = "000" SHIND2 = "H" ) // 上交所股票 const ( SHA = "60" // 沪市A股 SHB = "900" // 沪市B股 ) // 上交所基金 const ( SHF1 = "50" // 封闭式基金 SHF2 = "510" // ETF SHF3 = "519" // 实时申赎货币基金 SHF4 = "511" // 交易型货币基金 ) // 上交所债券 const ( SHB1 = "010" // 国债 SHB2 = "130" // 地方政府债券 SHB3 = "12" // 企业债券 SHB4 = "11" // 可转换公司债券 SHB5 = "20" // 债券回购 // SHB6 = "126" // 分离债 // SHB7 = "121" // 资产支持证券 ) var SHSecurites = map[string]string{ SHIND1: "上交所指数", SHIND2: "上交所指数", SHA: "沪市A股", SHB: "沪市B股", SHF1: "上交所基金", SHF2: "上交所基金ETF", SHF3: "上交所基金", SHF4: "上交所基金", SHB1: "上交所债券", SHB2: "上交所债券", SHB3: "上交所债券", SHB4: "上交所债券", SHB5: "上交所债券", // SHB6, // SHB7, } // 3 9 xxxx 综合指数/成份指数 // 深交所指数 const ( SZIND = "399" ) // 0 0 xxxx A股证券3 xxxx A股A2权证7 xxxx A股增发8 xxxx A股A1权证9 xxxx A股转配 // 3 0 xxxx 创业板证券7 xxxx 创业板增发8 xxxx 创业板权证 // 深交所股票 const ( SZA = "000" // 深市A股 SZB = "200" // 深市B股 SME = "002" // 中小板 GEM = "30" // 创业板 ) // 1 7 xxxx 原有投资基金8 xxxx 证券投资基金 // 深交所基金 const ( SZF1 = "150" // SZF2 = "159" // ETF SZF3 = "16" // SZF5 = "18" // ) // 1 0 xxxx 国债现货1 xxxx 债券2 xxxx 可转换债券3 xxxx 国债回购 // 深交所债券 const ( SZB1 = "10" SZB2 = "11" SZB3 = "12" SZB4 = "13" ) var SZSecurites = map[string]string{ SZIND: "深交所指数", SZA: "深市A股", SZB: "深市B股", SME: "中小板", GEM: "创业板", SZF1: "深交所基金", SZF2: "深交所基金ETF", SZF3: "深交所基金", SZF5: "深交所基金", SZB1: "深交所债券", SZB2: "深交所债券", SZB3: "深交所债券", SZB4: "深交所债券", } const ( Custom = "custom" Forex = "forex" Futures = "futures" Securities = "securities" Btcs = "btcs" ) var TypeMap = map[string]string{ Lmax: "外汇", Ctp: "期货", Dzh: "证券", Btcs: "虚拟币", Saxo: "盛宝", } type Instrument struct { Id string `json:"insId"` // ID = 前缀+原始ID Name string `json:"name"` // 名称 Typ string `json:"type"` // 用来区分种类 ExId string `json:"exid"` // 交易所ID PriceInc float64 `json:"priceInc"` // 最小加价 Margin float64 `json:"margin"` // 保证金 StartTime int64 `json:"st"` // 上市时间 EndTime int64 `json:"et"` // 下市时间 mu sync.Mutex mk *Market mkPublisher event.EventPublisher // mk数据事件 } func (ins *Instrument) OnMarket() *event.Event { return ins.mkPublisher.Event() } func (ins *Instrument) SetMk(mk *Market) { ins.mu.Lock() ins.mk = mk ins.mu.Unlock() mk.SetIns(ins) ins.mkPublisher.Publish(mk) //异步 } func (ins *Instrument) GetMk() *Market { ins.mu.Lock() if ins.mk == nil { ins.mk = &Market{ ins: ins, InsId: ins.Id, } } mk := *ins.mk ins.mu.Unlock() return &mk } func (ins *Instrument) FmtPrice(f float64) string { if f == math.MaxFloat64 || math.IsInf(f, 0) || math.IsNaN(f) { return "-" } pt := ins.PriceInc if pt < 0.0000001 { return fmt.Sprintf("%.8f", f) } else if pt < 0.000001 { return fmt.Sprintf("%.7f", f) } else if pt < 0.00001 { return fmt.Sprintf("%.6f", f) } else if pt < 0.0001 { return fmt.Sprintf("%.5f", f) } else if pt < 0.001 { return fmt.Sprintf("%.4f", f) } else if pt < 0.01 { return fmt.Sprintf("%.3f", f) } else if pt < 0.1 { return fmt.Sprintf("%.2f", f) } else if pt < 1. { return fmt.Sprintf("%.1f", f) } else { return fmt.Sprintf("%d", int(f)) } } func InsIdPrefix(insId string) string { return strings.Split(insId, "_")[0] } func RealInsId(insId string) string { return strings.Split(insId, "_")[1] } // 实时数据订阅参数 type SubArgs struct { InsId string `json:"insId"` // 产品Id Code int64 `json:"code"` // 客户端代码 IsCancel bool `json:"cancel"` // 是否取消订阅 } type PP [2]float64 // [0]为价格, [1]为数量 type Tick struct { // InsId string `json:"insId"` // 产品ID Timestamp int64 `json:"ts"` // 时间戳 Price float64 `json:"last"` // 最新价 Volume float64 `json:"volume"` // 本次成交量(增量) Bid PP `json:"bids"` // 申买 Ask PP `json:"asks"` // 申卖 } // 市场深度 type Depth struct { Bids []PP `json:"bids"` // 申买 Asks []PP `json:"asks"` // 申卖 } // 实时行情数据 type Market struct { InsId string `json:"insId"` // 产品ID Timestamp int64 `json:"ts"` // 时间戳 Close float64 `json:"close"` // 昨日收盘价 Open float64 `json:"open"` // 今日开盘价 High float64 `json:"high"` // 当日最高价 Low float64 `json:"low"` // 当日最低价 AllVolume float64 `json:"allVolume"` // 当日成交量 AllAmount float64 `json:"allAmount"` // 成交额 LastPrice float64 `json:"last"` // 最新价 Volume float64 `json:"volume"` // 本次成交量(增量) Bids []PP `json:"bids"` // 申买 Asks []PP `json:"asks"` // 申卖 ins *Instrument } func (m *Market) Ins() *Instrument { return m.ins } func (m *Market) SetIns(ins *Instrument) { m.ins = ins } func Market2TickByBid(m *Market) *Tick { t := Market2Tick(m) if len(m.Bids) > 0 { t.Price = m.Bids[0][0] t.Volume = m.Bids[0][1] } return t } func Market2Tick(m *Market) *Tick { t := &Tick{ Timestamp: m.Timestamp, Price: m.LastPrice, Volume: m.Volume, } if len(m.Asks) > 0 { t.Ask = m.Asks[0] } if len(m.Bids) > 0 { t.Bid = m.Bids[0] } return t } func Market2Depth(mk *Market) *Depth { return &Depth{ mk.Bids, mk.Asks, } } func WriteBinary(w io.Writer, v interface{}) error { return binary.Write(w, binary.LittleEndian, v) } func WriteTickBinary(w io.Writer, t *Tick) error { return binary.Write(w, binary.LittleEndian, t) } func ReadTickBinary(r io.Reader) (*Tick, error) { t := &Tick{} err := binary.Read(r, binary.LittleEndian, t) if err != nil { return nil, err } return t, nil } type Candle struct { Timestamp int64 `json:"ts"` Open float64 `json:"open"` High float64 `json:"high"` Low float64 `json:"low"` Close float64 `json:"close"` RealVolums float64 `json:"realVol"` TickVolums float64 `json:"tickVol"` } func WriteCandleBinary(w io.Writer, c *Candle) error { return binary.Write(w, binary.LittleEndian, c) } func ReadCandleBinary(r io.Reader) (*Candle, error) { c := &Candle{} err := binary.Read(r, binary.LittleEndian, c) if err != nil { return nil, err } return c, nil } type CandleBuf struct { sync.Mutex Buf []Candle } type tickBuf struct { sync.Mutex buf []Tick } type SearchArgs struct { N int TS int64 } func (buf *tickBuf) add(t *Tick) bool { buf.Lock() defer buf.Unlock() buf.buf = append(buf.buf, *t) if len(buf.buf) >= 2000 { return true } else { return false } } func (buf *CandleBuf) add(c *Candle, period int) { buf.Lock() defer buf.Unlock() bufLen := 1000 if period == M1 { bufLen = 1500 } if len(buf.Buf) < bufLen { buf.Buf = append(buf.Buf, *c) } else { buf.Buf = buf.Buf[1:] buf.Buf = append(buf.Buf, *c) } } func (buf *tickBuf) leng() int { buf.Lock() defer buf.Unlock() return len(buf.buf) } func (buf *CandleBuf) leng() int { buf.Lock() defer buf.Unlock() return len(buf.Buf) } func (buf *CandleBuf) Last() *Candle { buf.Lock() defer buf.Unlock() if len(buf.Buf) == 0 { return nil } return &buf.Buf[len(buf.Buf)-1] } func (buf *CandleBuf) at(i int) *Candle { buf.Lock() defer buf.Unlock() if len(buf.Buf)-1 < i { return nil } c := buf.Buf[i] return &c } func (buf *CandleBuf) Search(args *SearchArgs) ([]Candle, error) { buf.Lock() defer buf.Unlock() p := len(buf.Buf) if p == 0 { return nil, ErrNotEnough } if args.TS == TimeNow { p -= 1 } else { p = sort.Search(len(buf.Buf), func(i int) bool { c := buf.Buf[i] if c.Timestamp >= args.TS { return true } return false }) } if p != len(buf.Buf) { // 在缓存中 n := args.N if n < 0 { n = -n if p >= n-1 { return buf.Buf[p-n+1 : p+1], nil } // 缓存中不够 args.N = p - n + 1 args.TS = buf.Buf[0].Timestamp return buf.Buf[:p], ErrNotEnough } if len(buf.Buf)-p > n { return buf.Buf[p : p+n], nil } p := len(buf.Buf) - n if p < 0 { p = 0 } return buf.Buf[p:], nil } return nil, ErrNotEnough } func getTime(ts int64) time.Time { if ts < 0 { return time.Now() } return time.Unix(ts/1000, (ts%1000)*1e6) } func GetTime(ts int64) time.Time { return getTime(ts) } var ErrNoDataBefore = errors.New("No Data before") func ZipBuf(v interface{}) ([]byte, error) { return zipBuf(v) } func zipBuf(v interface{}) ([]byte, error) { buf := &bytes.Buffer{} candles, ok := v.([]Candle) if ok { err := zipCBuf(buf, candles) if err != nil { return nil, err } return buf.Bytes(), nil } ticks, ok := v.([]Tick) if ok { err := zipTBuf(buf, ticks) if err != nil { return nil, err } return buf.Bytes(), nil } return nil, errors.New("zipBuf error: paramter v is NOT []Tick or []Candle") } func UnzipBufT(b []byte) ([]Tick, error) { return unzipBufT(b) } func unzipBufT(b []byte) ([]Tick, error) { r := bytes.NewReader(b) return unzipT(r) } func UnzipBufC(b []byte) ([]Candle, error) { return unzipBufC(b) } func unzipBufC(b []byte) ([]Candle, error) { r := bytes.NewReader(b) return unzipC(r) } func ZipTBuf(w io.Writer, ticks []Tick) error { return zipTBuf(w, ticks) } func zipTBuf(w io.Writer, ticks []Tick) error { gw := gzip.NewWriter(w) defer gw.Close() for _, t := range ticks { err := WriteTickBinary(gw, &t) if err != nil { return err } } return gw.Flush() } func ZipCBuf(w io.Writer, candles []Candle) error { return zipCBuf(w, candles) } func zipCBuf(w io.Writer, candles []Candle) error { gw := gzip.NewWriter(w) defer gw.Close() for _, c := range candles { err := WriteCandleBinary(gw, &c) if err != nil { return err } } return gw.Flush() } func UnzipC(r io.Reader) ([]Candle, error) { return unzipC(r) } func readR(r io.Reader) ([]Candle, error) { candles := []Candle{} for { c, err := ReadCandleBinary(r) if err != nil { if err != io.EOF { return candles, err } break } if math.IsNaN(c.Open*2) || math.IsInf(c.Open*2, 0) { continue } if math.IsNaN(c.High*2) || math.IsInf(c.High*2, 0) { continue } if math.IsNaN(c.Low*2) || math.IsInf(c.Low*2, 0) { continue } if math.IsNaN(c.Close*2) || math.IsInf(c.Close*2, 0) { continue } candles = append(candles, *c) } return candles, nil } func unzipC(r io.Reader) ([]Candle, error) { gr, err := gzip.NewReader(r) if err != nil { return nil, err } defer gr.Close() candles := []Candle{} for { c, err := ReadCandleBinary(gr) if err != nil { if err != io.EOF { return candles, err } break } if math.IsNaN(c.Open*2) || math.IsInf(c.Open*2, 0) { continue } if math.IsNaN(c.High*2) || math.IsInf(c.High*2, 0) { continue } if math.IsNaN(c.Low*2) || math.IsInf(c.Low*2, 0) { continue } if math.IsNaN(c.Close*2) || math.IsInf(c.Close*2, 0) { continue } candles = append(candles, *c) } return candles, nil } func UnzipT(r io.Reader) ([]Tick, error) { return unzipT(r) } func unzipT(r io.Reader) ([]Tick, error) { gr, err := gzip.NewReader(r) if err != nil { return nil, err } defer gr.Close() ticks := []Tick{} for { t, err := ReadTickBinary(gr) if err != nil { if err != io.EOF { return nil, err } break } if math.IsNaN(t.Price) || math.IsInf(t.Price*2, 0) { continue } ticks = append(ticks, *t) } return ticks, nil } var ErrNotEnough = errors.New("Not enough data") func readTickFile(fname string) ([]Tick, error) { f, err := os.Open(fname) if err != nil { log.Fatal(err.Error() + fname) return nil, err } defer f.Close() return unzipT(f) } func ReadNoZipCandleFile(fname string) ([]Candle, error) { f, err := os.Open(fname) if err != nil { return nil, err } defer f.Close() return readR(f) } func readCandleFile(fname string) ([]Candle, error) { f, err := os.Open(fname) if err != nil { return nil, err } defer f.Close() return unzipC(f) } func ReadCandleFile(fname string) ([]Candle, error) { return readCandleFile(fname) } func ReadTickFile(fname string) ([]Tick, error) { return readTickFile(fname) } func SaveTickEx(dataDir string, ts []Tick, insId string, bTruncate bool) (string, error) { if len(ts) == 0 { return "", errors.New("len(ts) == 0") } t := time.Unix(ts[0].Timestamp/1000, 0) dir := path.Join(dataDir, insId, fmt.Sprint(t.Year())) os.MkdirAll(dir, 0777) fname := path.Join(dir, fmt.Sprintf("%04d%02d%02d.tk.gz", t.Year(), t.Month(), t.Day())) var w io.WriteCloser var err error if bTruncate { w, err = os.Create(fname) } else { w, err = os.OpenFile(fname, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) } if err != nil { return "", errors.New("SaveTicks os.Create error:" + err.Error()) } defer w.Close() gw := gzip.NewWriter(w) bw := bufio.NewWriter(gw) for _, v := range ts { binary.Write(bw, binary.LittleEndian, v) } bw.Flush() gw.Close() return fname, nil } // 把之前文件中candles和新的合并 func combinEx(filename string, candles []Candle) ([]Candle, error) { buf, err := ReadCandleFile(filename) if err != nil { return candles, err } // n := len(buf) if candles[0].Timestamp == buf[n-1].Timestamp { buf[n-1].High = max(buf[n-1].High, candles[0].High) buf[n-1].Low = min(buf[n-1].Low, candles[0].Low) buf[n-1].Close = candles[0].Close buf[n-1].RealVolums += candles[0].RealVolums buf[n-1].TickVolums += candles[0].TickVolums candles = append(buf, candles[1:]...) } else { candles = append(buf, candles...) } return candles, nil }