123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337 |
- package client
- import "tickserver/framework/event"
- import "sync"
- import "errors"
- import "time"
- import "strings"
- import "io"
- import "log"
- import "math"
- import "compress/gzip"
- import "encoding/binary"
- import "os"
- import "fmt"
- const (
- LmaxPrefix = "lmax_"
- OandaPrefix = "oanda_"
- EasyForexPrefix = "easyforex_"
- CtpPrefix = "ctp_"
- FixPrefix = "fix_"
- DzhPrefix = "dzh_"
- SaxoPrefix = "saxo_"
- )
- // 周期定义
- const (
- TK = 0
- S1 = 1
- S2 = 2
- S3 = 3
- S4 = 4
- S5 = 5
- S15 = 15
- S30 = 30
- M1 = 1 * 60
- M2 = 2 * 60
- M3 = 3 * 60
- M4 = 4 * 60
- M5 = 5 * 60
- M15 = 15 * 60
- M30 = 30 * 60
- H1 = 60 * 60
- H2 = 2 * 60 * 60
- H4 = 4 * 60 * 60
- D1 = 24 * 3600
- W1 = 7 * 24 * 3600
- MN1 = 30 * 24 * 3600
- )
- var PeriodNameMap = map[int]string{
- TK: "Tick",
- S1: "S1",
- S2: "S2",
- S3: "S3",
- S4: "S4",
- S5: "S5",
- S15: "S15",
- S30: "S30",
- M1: "M1",
- M2: "M2",
- M3: "M3",
- M4: "M4",
- M5: "M5",
- M15: "M15",
- M30: "M30",
- H1: "H1",
- H2: "H2",
- H4: "H4",
- D1: "D1",
- W1: "W1",
- MN1: "MN1",
- }
- var ErrNotEnough = errors.New("Not enough data")
- var TimeNow = int64(-1)
- type PP [2]float64 // [0]为价格, [1]为数量
- // 实时行情数据
- type Market struct {
- InsId string `json:"insId"` // 产品ID
- Timestamp int64 `json:"ts"` // 时间戳
- Close float64 `json:"close"` // 昨日收盘价
- Open float64 `json:"open"` // 今日开盘价
- High float64 `json:"high"` // 当日最高价
- Low float64 `json:"low"` // 当日最低价
- AllVolume float64 `json:"allVolume"` // 当日成交量
- AllAmount float64 `json:"allAmount"` // 成交额
- LastPrice float64 `json:"last"` // 最新价
- Volume float64 `json:"volume"` // 本次成交量(增量)
- Bids []PP `json:"bids"` // 申买
- Asks []PP `json:"asks"` // 申卖
- ins *Instrument
- }
- type Instrument struct {
- Id string `json:"insId"` // ID = 前缀+原始ID
- Name string `json:"name"` // 名称
- Typ string `json:"type"` // 用来区分种类
- ExId string `json:"exid"` // 交易所ID
- PriceInc float64 `json:"priceInc"` // 最小加价
- Margin float64 `json:"margin"` // 保证金
- StartTime int64 `json:"st"` // 上市时间
- EndTime int64 `json:"et"` // 下市时间
- mu sync.Mutex
- mk *Market
- mkPublisher event.EventPublisher // mk数据事件
- }
- func (ins *Instrument) OnMarket() *event.Event {
- return ins.mkPublisher.Event()
- }
- func (ins *Instrument) SetMk(mk *Market) {
- ins.mu.Lock()
- ins.mk = mk
- ins.mu.Unlock()
- mk.SetIns(ins)
- ins.mkPublisher.Publish(mk) //异步
- }
- func (ins *Instrument) GetMk() *Market {
- ins.mu.Lock()
- if ins.mk == nil {
- ins.mk = &Market{
- ins: ins,
- InsId: ins.Id,
- }
- }
- mk := *ins.mk
- ins.mu.Unlock()
- return &mk
- }
- func (ins *Instrument) FmtPrice(f float64) string {
- if f == math.MaxFloat64 || math.IsInf(f, 0) || math.IsNaN(f) {
- return "-"
- }
- pt := ins.PriceInc
- if pt < 0.0000001 {
- return fmt.Sprintf("%.8f", f)
- } else if pt < 0.000001 {
- return fmt.Sprintf("%.7f", f)
- } else if pt < 0.00001 {
- return fmt.Sprintf("%.6f", f)
- } else if pt < 0.0001 {
- return fmt.Sprintf("%.5f", f)
- } else if pt < 0.001 {
- return fmt.Sprintf("%.4f", f)
- } else if pt < 0.01 {
- return fmt.Sprintf("%.3f", f)
- } else if pt < 0.1 {
- return fmt.Sprintf("%.2f", f)
- } else if pt < 1. {
- return fmt.Sprintf("%.1f", f)
- } else {
- return fmt.Sprintf("%d", int(f))
- }
- }
- type DBConf struct {
- DBDriver string // mysql, sqlite
- DSN string // dsn = fmt.Sprintf("root:fzm@1001@/%s?charset=%s", dbName, "utf8")
- DBName string // fzmdb
- }
- // 历史数据应答参数
- type HistoryReply struct {
- Url string `json:"url"`
- St int64 `json:"st"`
- Et int64 `json:"et"`
- N int `json:"n"`
- }
- type Tick struct {
- // InsId string `json:"insId"` // 产品ID
- Timestamp int64 `json:"ts"` // 时间戳
- Price float64 `json:"last"` // 最新价
- Volume float64 `json:"volume"` // 本次成交量(增量)
- Bid PP `json:"bids"` // 申买
- Ask PP `json:"asks"` // 申卖
- }
- type Candle struct {
- Timestamp int64 `json:"ts"`
- Open float64 `json:"open"`
- High float64 `json:"high"`
- Low float64 `json:"low"`
- Close float64 `json:"close"`
- RealVolums float64 `json:"realVol"`
- TickVolums float64 `json:"tickVol"`
- }
- // 实时数据订阅参数
- type SubArgs struct {
- InsId string `json:"insId"` // 产品Id
- Code int64 `json:"code"` // 客户端代码
- IsCancel bool `json:"cancel"` // 是否取消订阅
- }
- func InsIdPrefix(insId string) string {
- return strings.Split(insId, "_")[0]
- }
- func getTime(ts int64) time.Time {
- if ts < 0 {
- return time.Now()
- }
- return time.Unix(ts/1000, (ts%1000)*1e6)
- }
- func GetTime(ts int64) time.Time {
- return getTime(ts)
- }
- func DebugDelay(prefix, insId string, ts int64) {
- // for debug delay
- now := time.Now()
- d := int64(float64(now.UnixNano())*1e-6) - ts
- if d > 1000 {
- log.Println(prefix, "delay > 1000ms", insId, d, getTime(ts))
- }
- }
- func (m *Market) SetIns(ins *Instrument) {
- m.ins = ins
- }
- func UnzipC(r io.Reader) ([]Candle, error) {
- return unzipC(r)
- }
- func unzipC(r io.Reader) ([]Candle, error) {
- gr, err := gzip.NewReader(r)
- if err != nil {
- return nil, err
- }
- defer gr.Close()
- candles := []Candle{}
- for {
- c, err := ReadCandleBinary(gr)
- if err != nil {
- if err != io.EOF {
- return candles, err
- }
- break
- }
- if math.IsNaN(c.Open*2) || math.IsInf(c.Open*2, 0) {
- continue
- }
- if math.IsNaN(c.High*2) || math.IsInf(c.High*2, 0) {
- continue
- }
- if math.IsNaN(c.Low*2) || math.IsInf(c.Low*2, 0) {
- continue
- }
- if math.IsNaN(c.Close*2) || math.IsInf(c.Close*2, 0) {
- continue
- }
- candles = append(candles, *c)
- }
- return candles, nil
- }
- func UnzipT(r io.Reader) ([]Tick, error) {
- return unzipT(r)
- }
- func unzipT(r io.Reader) ([]Tick, error) {
- gr, err := gzip.NewReader(r)
- if err != nil {
- return nil, err
- }
- defer gr.Close()
- ticks := []Tick{}
- for {
- t, err := ReadTickBinary(gr)
- if err != nil {
- if err != io.EOF {
- return nil, err
- }
- break
- }
- if math.IsNaN(t.Price) || math.IsInf(t.Price*2, 0) {
- continue
- }
- ticks = append(ticks, *t)
- }
- return ticks, nil
- }
- func readTickFile(fname string) ([]Tick, error) {
- f, err := os.Open(fname)
- if err != nil {
- // log.Fatal(err.Error() + fname)
- return nil, err
- }
- defer f.Close()
- return unzipT(f)
- }
- func readCandleFile(fname string) ([]Candle, error) {
- f, err := os.Open(fname)
- if err != nil {
- return nil, err
- }
- defer f.Close()
- return unzipC(f)
- }
- func ReadCandleFile(fname string) ([]Candle, error) {
- return readCandleFile(fname)
- }
- func ReadTickFile(fname string) ([]Tick, error) {
- return readTickFile(fname)
- }
- func ReadTickBinary(r io.Reader) (*Tick, error) {
- t := &Tick{}
- err := binary.Read(r, binary.LittleEndian, t)
- if err != nil {
- return nil, err
- }
- return t, nil
- }
- func ReadCandleBinary(r io.Reader) (*Candle, error) {
- c := &Candle{}
- err := binary.Read(r, binary.LittleEndian, c)
- if err != nil {
- return nil, err
- }
- return c, nil
- }
|