|
- // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved.
- package market
- // 本文件包含数据结构的定义以及通用函数的实现
- import (
- "bufio"
- "bytes"
- "compress/gzip"
- "encoding/binary"
- "errors"
- "fmt"
- "io"
- "log"
- "math"
- "os"
- "path"
- "sort"
- "strings"
- "sync"
- "time"
- "tickserver/framework/event"
- )
- func DebugDelay(prefix, insId string, ts int64) {
- // for debug delay
- now := time.Now()
- d := int64(float64(now.UnixNano())*1e-6) - ts
- if d > 1000 {
- //log.Println(prefix, "delay > 1000ms", insId, d, getTime(ts))
- }
- }
- const (
- Lmax = "lmax"
- Oanda = "oanda"
- EasyForex = "easyforex"
- Ctp = "ctp"
- Fix = "fix"
- Dzh = "dzh"
- Saxo = "saxo"
- Btc = "btc"
- Polo = "polo"
- Bty = "bty"
- CFix = "cfix"
- Huobi = "huobi"
- Yunbi = "yunbi"
- Chbtc = "chbtc"
- General = "general"
- )
- const (
- LmaxPrefix = "lmax_"
- OandaPrefix = "oanda_"
- EasyForexPrefix = "easyforex_"
- CtpPrefix = "ctp_"
- FixPrefix = "fix_"
- DzhPrefix = "dzh_"
- SaxoPrefix = "saxo_"
- BtcPrefix = "btc_"
- PoloPrefix = "polo_"
- BtyPrefix = "bty_"
- CFixPrefix = "cfix_"
- HuobiPrefix = "huobi_"
- YunbiPrefix = "yunbi_"
- ChbtcPrefix = "chbtc_"
- )
- // 期货交易所
- const (
- SHFE = "SHFE" // 上海期货交易所
- CFFEX = "CFFEX" // 中国金融交易所
- DEC = "DEC" // 大连商品交易所
- CZCE = "CZCE" // 郑州商品交易所
- )
- // 证券交易所
- const (
- SHEX = "SH" // 上海证券交易所
- SZEX = "SZ" // 深证证券交易所
- )
- // 上交所指数
- const (
- SHIND1 = "000"
- SHIND2 = "H"
- )
- // 上交所股票
- const (
- SHA = "60" // 沪市A股
- SHB = "900" // 沪市B股
- )
- // 上交所基金
- const (
- SHF1 = "50" // 封闭式基金
- SHF2 = "510" // ETF
- SHF3 = "519" // 实时申赎货币基金
- SHF4 = "511" // 交易型货币基金
- )
- // 上交所债券
- const (
- SHB1 = "010" // 国债
- SHB2 = "130" // 地方政府债券
- SHB3 = "12" // 企业债券
- SHB4 = "11" // 可转换公司债券
- SHB5 = "20" // 债券回购
- // SHB6 = "126" // 分离债
- // SHB7 = "121" // 资产支持证券
- )
- var SHSecurites = map[string]string{
- SHIND1: "上交所指数",
- SHIND2: "上交所指数",
- SHA: "沪市A股",
- SHB: "沪市B股",
- SHF1: "上交所基金",
- SHF2: "上交所基金ETF",
- SHF3: "上交所基金",
- SHF4: "上交所基金",
- SHB1: "上交所债券",
- SHB2: "上交所债券",
- SHB3: "上交所债券",
- SHB4: "上交所债券",
- SHB5: "上交所债券",
- // SHB6,
- // SHB7,
- }
- // 3 9 xxxx 综合指数/成份指数
- // 深交所指数
- const (
- SZIND = "399"
- )
- // 0 0 xxxx A股证券3 xxxx A股A2权证7 xxxx A股增发8 xxxx A股A1权证9 xxxx A股转配
- // 3 0 xxxx 创业板证券7 xxxx 创业板增发8 xxxx 创业板权证
- // 深交所股票
- const (
- SZA = "000" // 深市A股
- SZB = "200" // 深市B股
- SME = "002" // 中小板
- GEM = "30" // 创业板
- )
- // 1 7 xxxx 原有投资基金8 xxxx 证券投资基金
- // 深交所基金
- const (
- SZF1 = "150" //
- SZF2 = "159" // ETF
- SZF3 = "16" //
- SZF5 = "18" //
- )
- // 1 0 xxxx 国债现货1 xxxx 债券2 xxxx 可转换债券3 xxxx 国债回购
- // 深交所债券
- const (
- SZB1 = "10"
- SZB2 = "11"
- SZB3 = "12"
- SZB4 = "13"
- )
- var SZSecurites = map[string]string{
- SZIND: "深交所指数",
- SZA: "深市A股",
- SZB: "深市B股",
- SME: "中小板",
- GEM: "创业板",
- SZF1: "深交所基金",
- SZF2: "深交所基金ETF",
- SZF3: "深交所基金",
- SZF5: "深交所基金",
- SZB1: "深交所债券",
- SZB2: "深交所债券",
- SZB3: "深交所债券",
- SZB4: "深交所债券",
- }
- const (
- Custom = "custom"
- Forex = "forex"
- Futures = "futures"
- Securities = "securities"
- Btcs = "btcs"
- )
- var TypeMap = map[string]string{
- Lmax: "外汇",
- Ctp: "期货",
- Dzh: "证券",
- Btcs: "虚拟币",
- Saxo: "盛宝",
- }
- type Instrument struct {
- Id string `json:"insId"` // ID = 前缀+原始ID
- Name string `json:"name"` // 名称
- Typ string `json:"type"` // 用来区分种类
- ExId string `json:"exid"` // 交易所ID
- PriceInc float64 `json:"priceInc"` // 最小加价
- Margin float64 `json:"margin"` // 保证金
- StartTime int64 `json:"st"` // 上市时间
- EndTime int64 `json:"et"` // 下市时间
- mu sync.Mutex
- mk *Market
- mkPublisher event.EventPublisher // mk数据事件
- }
- func (ins *Instrument) OnMarket() *event.Event {
- return ins.mkPublisher.Event()
- }
- func (ins *Instrument) SetMk(mk *Market) {
- ins.mu.Lock()
- ins.mk = mk
- ins.mu.Unlock()
- mk.SetIns(ins)
- ins.mkPublisher.Publish(mk) //异步
- }
- func (ins *Instrument) GetMk() *Market {
- ins.mu.Lock()
- if ins.mk == nil {
- ins.mk = &Market{
- ins: ins,
- InsId: ins.Id,
- }
- }
- mk := *ins.mk
- ins.mu.Unlock()
- return &mk
- }
- func (ins *Instrument) FmtPrice(f float64) string {
- if f == math.MaxFloat64 || math.IsInf(f, 0) || math.IsNaN(f) {
- return "-"
- }
- pt := ins.PriceInc
- if pt < 0.0000001 {
- return fmt.Sprintf("%.8f", f)
- } else if pt < 0.000001 {
- return fmt.Sprintf("%.7f", f)
- } else if pt < 0.00001 {
- return fmt.Sprintf("%.6f", f)
- } else if pt < 0.0001 {
- return fmt.Sprintf("%.5f", f)
- } else if pt < 0.001 {
- return fmt.Sprintf("%.4f", f)
- } else if pt < 0.01 {
- return fmt.Sprintf("%.3f", f)
- } else if pt < 0.1 {
- return fmt.Sprintf("%.2f", f)
- } else if pt < 1. {
- return fmt.Sprintf("%.1f", f)
- } else {
- return fmt.Sprintf("%d", int(f))
- }
- }
- func InsIdPrefix(insId string) string {
- return strings.Split(insId, "_")[0]
- }
- func RealInsId(insId string) string {
- return strings.Split(insId, "_")[1]
- }
- // 实时数据订阅参数
- type SubArgs struct {
- InsId string `json:"insId"` // 产品Id
- Code int64 `json:"code"` // 客户端代码
- IsCancel bool `json:"cancel"` // 是否取消订阅
- }
- type PP [2]float64 // [0]为价格, [1]为数量
- type Tick struct {
- // InsId string `json:"insId"` // 产品ID
- Timestamp int64 `json:"ts"` // 时间戳
- Price float64 `json:"last"` // 最新价
- Volume float64 `json:"volume"` // 本次成交量(增量)
- Bid PP `json:"bids"` // 申买
- Ask PP `json:"asks"` // 申卖
- }
- // 市场深度
- type Depth struct {
- Bids []PP `json:"bids"` // 申买
- Asks []PP `json:"asks"` // 申卖
- }
- // 实时行情数据
- type Market struct {
- InsId string `json:"insId"` // 产品ID
- Timestamp int64 `json:"ts"` // 时间戳
- Close float64 `json:"close"` // 昨日收盘价
- Open float64 `json:"open"` // 今日开盘价
- High float64 `json:"high"` // 当日最高价
- Low float64 `json:"low"` // 当日最低价
- AllVolume float64 `json:"allVolume"` // 当日成交量
- AllAmount float64 `json:"allAmount"` // 成交额
- LastPrice float64 `json:"last"` // 最新价
- Volume float64 `json:"volume"` // 本次成交量(增量)
- Bids []PP `json:"bids"` // 申买
- Asks []PP `json:"asks"` // 申卖
- ins *Instrument
- }
- func (m *Market) Ins() *Instrument {
- return m.ins
- }
- func (m *Market) SetIns(ins *Instrument) {
- m.ins = ins
- }
- func Market2TickByBid(m *Market) *Tick {
- t := Market2Tick(m)
- if len(m.Bids) > 0 {
- t.Price = m.Bids[0][0]
- t.Volume = m.Bids[0][1]
- }
- return t
- }
- func Market2Tick(m *Market) *Tick {
- t := &Tick{
- Timestamp: m.Timestamp,
- Price: m.LastPrice,
- Volume: m.Volume,
- }
- if len(m.Asks) > 0 {
- t.Ask = m.Asks[0]
- }
- if len(m.Bids) > 0 {
- t.Bid = m.Bids[0]
- }
- return t
- }
- func Market2Depth(mk *Market) *Depth {
- return &Depth{
- mk.Bids,
- mk.Asks,
- }
- }
- func WriteBinary(w io.Writer, v interface{}) error {
- return binary.Write(w, binary.LittleEndian, v)
- }
- func WriteTickBinary(w io.Writer, t *Tick) error {
- return binary.Write(w, binary.LittleEndian, t)
- }
- func ReadTickBinary(r io.Reader) (*Tick, error) {
- t := &Tick{}
- err := binary.Read(r, binary.LittleEndian, t)
- if err != nil {
- return nil, err
- }
- return t, nil
- }
- type Candle struct {
- Timestamp int64 `json:"ts"`
- Open float64 `json:"open"`
- High float64 `json:"high"`
- Low float64 `json:"low"`
- Close float64 `json:"close"`
- RealVolums float64 `json:"realVol"`
- TickVolums float64 `json:"tickVol"`
- }
- func WriteCandleBinary(w io.Writer, c *Candle) error {
- return binary.Write(w, binary.LittleEndian, c)
- }
- func ReadCandleBinary(r io.Reader) (*Candle, error) {
- c := &Candle{}
- err := binary.Read(r, binary.LittleEndian, c)
- if err != nil {
- return nil, err
- }
- return c, nil
- }
- type CandleBuf struct {
- sync.Mutex
- Buf []Candle
- }
- type tickBuf struct {
- sync.Mutex
- buf []Tick
- }
- type SearchArgs struct {
- N int
- TS int64
- }
- func (buf *tickBuf) add(t *Tick) bool {
- buf.Lock()
- defer buf.Unlock()
- buf.buf = append(buf.buf, *t)
- if len(buf.buf) >= 2000 {
- return true
- } else {
- return false
- }
- }
- func (buf *CandleBuf) add(c *Candle, period int) {
- buf.Lock()
- defer buf.Unlock()
- bufLen := 1000
- if period == M1 {
- bufLen = 1500
- }
- if len(buf.Buf) < bufLen {
- buf.Buf = append(buf.Buf, *c)
- } else {
- buf.Buf = buf.Buf[1:]
- buf.Buf = append(buf.Buf, *c)
- }
- }
- func (buf *tickBuf) leng() int {
- buf.Lock()
- defer buf.Unlock()
- return len(buf.buf)
- }
- func (buf *CandleBuf) leng() int {
- buf.Lock()
- defer buf.Unlock()
- return len(buf.Buf)
- }
- func (buf *CandleBuf) Last() *Candle {
- buf.Lock()
- defer buf.Unlock()
- if len(buf.Buf) == 0 {
- return nil
- }
- return &buf.Buf[len(buf.Buf)-1]
- }
- func (buf *CandleBuf) at(i int) *Candle {
- buf.Lock()
- defer buf.Unlock()
- if len(buf.Buf)-1 < i {
- return nil
- }
- c := buf.Buf[i]
- return &c
- }
- func (buf *CandleBuf) Search(args *SearchArgs) ([]Candle, error) {
- buf.Lock()
- defer buf.Unlock()
- p := len(buf.Buf)
- if p == 0 {
- return nil, ErrNotEnough
- }
- if args.TS == TimeNow {
- p -= 1
- } else {
- p = sort.Search(len(buf.Buf), func(i int) bool {
- c := buf.Buf[i]
- if c.Timestamp >= args.TS {
- return true
- }
- return false
- })
- }
- if p != len(buf.Buf) { // 在缓存中
- n := args.N
- if n < 0 {
- n = -n
- if p >= n-1 {
- return buf.Buf[p-n+1 : p+1], nil
- }
- // 缓存中不够
- args.N = p - n + 1
- args.TS = buf.Buf[0].Timestamp
- return buf.Buf[:p], ErrNotEnough
- }
- if len(buf.Buf)-p > n {
- return buf.Buf[p : p+n], nil
- }
- p := len(buf.Buf) - n
- if p < 0 {
- p = 0
- }
- return buf.Buf[p:], nil
- }
- return nil, ErrNotEnough
- }
- func getTime(ts int64) time.Time {
- if ts < 0 {
- return time.Now()
- }
- return time.Unix(ts/1000, (ts%1000)*1e6)
- }
- func GetTime(ts int64) time.Time {
- return getTime(ts)
- }
- var ErrNoDataBefore = errors.New("No Data before")
- func ZipBuf(v interface{}) ([]byte, error) {
- return zipBuf(v)
- }
- func zipBuf(v interface{}) ([]byte, error) {
- buf := &bytes.Buffer{}
- candles, ok := v.([]Candle)
- if ok {
- err := zipCBuf(buf, candles)
- if err != nil {
- return nil, err
- }
- return buf.Bytes(), nil
- }
- ticks, ok := v.([]Tick)
- if ok {
- err := zipTBuf(buf, ticks)
- if err != nil {
- return nil, err
- }
- return buf.Bytes(), nil
- }
- return nil, errors.New("zipBuf error: paramter v is NOT []Tick or []Candle")
- }
- func UnzipBufT(b []byte) ([]Tick, error) {
- return unzipBufT(b)
- }
- func unzipBufT(b []byte) ([]Tick, error) {
- r := bytes.NewReader(b)
- return unzipT(r)
- }
- func UnzipBufC(b []byte) ([]Candle, error) {
- return unzipBufC(b)
- }
- func unzipBufC(b []byte) ([]Candle, error) {
- r := bytes.NewReader(b)
- return unzipC(r)
- }
- func ZipTBuf(w io.Writer, ticks []Tick) error {
- return zipTBuf(w, ticks)
- }
- func zipTBuf(w io.Writer, ticks []Tick) error {
- gw := gzip.NewWriter(w)
- defer gw.Close()
- for _, t := range ticks {
- err := WriteTickBinary(gw, &t)
- if err != nil {
- return err
- }
- }
- return gw.Flush()
- }
- func ZipCBuf(w io.Writer, candles []Candle) error {
- return zipCBuf(w, candles)
- }
- func zipCBuf(w io.Writer, candles []Candle) error {
- gw := gzip.NewWriter(w)
- defer gw.Close()
- for _, c := range candles {
- err := WriteCandleBinary(gw, &c)
- if err != nil {
- return err
- }
- }
- return gw.Flush()
- }
- func UnzipC(r io.Reader) ([]Candle, error) {
- return unzipC(r)
- }
- func readR(r io.Reader) ([]Candle, error) {
- candles := []Candle{}
- for {
- c, err := ReadCandleBinary(r)
- if err != nil {
- if err != io.EOF {
- return candles, err
- }
- break
- }
- if math.IsNaN(c.Open*2) || math.IsInf(c.Open*2, 0) {
- continue
- }
- if math.IsNaN(c.High*2) || math.IsInf(c.High*2, 0) {
- continue
- }
- if math.IsNaN(c.Low*2) || math.IsInf(c.Low*2, 0) {
- continue
- }
- if math.IsNaN(c.Close*2) || math.IsInf(c.Close*2, 0) {
- continue
- }
- candles = append(candles, *c)
- }
- return candles, nil
- }
- func unzipC(r io.Reader) ([]Candle, error) {
- gr, err := gzip.NewReader(r)
- if err != nil {
- return nil, err
- }
- defer gr.Close()
- candles := []Candle{}
- for {
- c, err := ReadCandleBinary(gr)
- if err != nil {
- if err != io.EOF {
- return candles, err
- }
- break
- }
- if math.IsNaN(c.Open*2) || math.IsInf(c.Open*2, 0) {
- continue
- }
- if math.IsNaN(c.High*2) || math.IsInf(c.High*2, 0) {
- continue
- }
- if math.IsNaN(c.Low*2) || math.IsInf(c.Low*2, 0) {
- continue
- }
- if math.IsNaN(c.Close*2) || math.IsInf(c.Close*2, 0) {
- continue
- }
- candles = append(candles, *c)
- }
- return candles, nil
- }
- func UnzipT(r io.Reader) ([]Tick, error) {
- return unzipT(r)
- }
- func unzipT(r io.Reader) ([]Tick, error) {
- gr, err := gzip.NewReader(r)
- if err != nil {
- return nil, err
- }
- defer gr.Close()
- ticks := []Tick{}
- for {
- t, err := ReadTickBinary(gr)
- if err != nil {
- if err != io.EOF {
- return nil, err
- }
- break
- }
- if math.IsNaN(t.Price) || math.IsInf(t.Price*2, 0) {
- continue
- }
- ticks = append(ticks, *t)
- }
- return ticks, nil
- }
- var ErrNotEnough = errors.New("Not enough data")
- func readTickFile(fname string) ([]Tick, error) {
- f, err := os.Open(fname)
- if err != nil {
- log.Fatal(err.Error() + fname)
- return nil, err
- }
- defer f.Close()
- return unzipT(f)
- }
- func ReadNoZipCandleFile(fname string) ([]Candle, error) {
- f, err := os.Open(fname)
- if err != nil {
- return nil, err
- }
- defer f.Close()
- return readR(f)
- }
- func readCandleFile(fname string) ([]Candle, error) {
- f, err := os.Open(fname)
- if err != nil {
- return nil, err
- }
- defer f.Close()
- return unzipC(f)
- }
- func ReadCandleFile(fname string) ([]Candle, error) {
- return readCandleFile(fname)
- }
- func ReadTickFile(fname string) ([]Tick, error) {
- return readTickFile(fname)
- }
- func SaveTickEx(dataDir string, ts []Tick, insId string, bTruncate bool) (string, error) {
- if len(ts) == 0 {
- return "", errors.New("len(ts) == 0")
- }
- t := time.Unix(ts[0].Timestamp/1000, 0)
- dir := path.Join(dataDir, insId, fmt.Sprint(t.Year()))
- os.MkdirAll(dir, 0777)
- fname := path.Join(dir, fmt.Sprintf("%04d%02d%02d.tk.gz", t.Year(), t.Month(), t.Day()))
- var w io.WriteCloser
- var err error
- if bTruncate {
- w, err = os.Create(fname)
- } else {
- w, err = os.OpenFile(fname, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
- }
- if err != nil {
- return "", errors.New("SaveTicks os.Create error:" + err.Error())
- }
- defer w.Close()
- gw := gzip.NewWriter(w)
- bw := bufio.NewWriter(gw)
- for _, v := range ts {
- binary.Write(bw, binary.LittleEndian, v)
- }
- bw.Flush()
- gw.Close()
- return fname, nil
- }
- // 把之前文件中candles和新的合并
- func combinEx(filename string, candles []Candle) ([]Candle, error) {
- buf, err := ReadCandleFile(filename)
- if err != nil {
- return candles, err
- }
- //
- n := len(buf)
- if candles[0].Timestamp == buf[n-1].Timestamp {
- buf[n-1].High = max(buf[n-1].High, candles[0].High)
- buf[n-1].Low = min(buf[n-1].Low, candles[0].Low)
- buf[n-1].Close = candles[0].Close
- buf[n-1].RealVolums += candles[0].RealVolums
- buf[n-1].TickVolums += candles[0].TickVolums
- candles = append(buf, candles[1:]...)
- } else {
- candles = append(buf, candles...)
- }
- return candles, nil
- }
|