market.go 15 KB


  1. // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved.
  2. package market
  3. // 本文件包含数据结构的定义以及通用函数的实现
  4. import (
  5. "bufio"
  6. "bytes"
  7. "compress/gzip"
  8. "encoding/binary"
  9. "errors"
  10. "fmt"
  11. "io"
  12. "log"
  13. "math"
  14. "os"
  15. "path"
  16. "sort"
  17. "strings"
  18. "sync"
  19. "time"
  20. "tickserver/framework/event"
  21. )
  22. func DebugDelay(prefix, insId string, ts int64) {
  23. // for debug delay
  24. now := time.Now()
  25. d := int64(float64(now.UnixNano())*1e-6) - ts
  26. if d > 1000 {
  27. //log.Println(prefix, "delay > 1000ms", insId, d, getTime(ts))
  28. }
  29. }
  30. const (
  31. Lmax = "lmax"
  32. Oanda = "oanda"
  33. EasyForex = "easyforex"
  34. Ctp = "ctp"
  35. Fix = "fix"
  36. Dzh = "dzh"
  37. Saxo = "saxo"
  38. Btc = "btc"
  39. Polo = "polo"
  40. Bty = "bty"
  41. CFix = "cfix"
  42. Huobi = "huobi"
  43. Yunbi = "yunbi"
  44. Chbtc = "chbtc"
  45. General = "general"
  46. )
  47. const (
  48. LmaxPrefix = "lmax_"
  49. OandaPrefix = "oanda_"
  50. EasyForexPrefix = "easyforex_"
  51. CtpPrefix = "ctp_"
  52. FixPrefix = "fix_"
  53. DzhPrefix = "dzh_"
  54. SaxoPrefix = "saxo_"
  55. BtcPrefix = "btc_"
  56. PoloPrefix = "polo_"
  57. BtyPrefix = "bty_"
  58. CFixPrefix = "cfix_"
  59. HuobiPrefix = "huobi_"
  60. YunbiPrefix = "yunbi_"
  61. ChbtcPrefix = "chbtc_"
  62. )
  63. // 期货交易所
  64. const (
  65. SHFE = "SHFE" // 上海期货交易所
  66. CFFEX = "CFFEX" // 中国金融交易所
  67. DEC = "DEC" // 大连商品交易所
  68. CZCE = "CZCE" // 郑州商品交易所
  69. )
  70. // 证券交易所
  71. const (
  72. SHEX = "SH" // 上海证券交易所
  73. SZEX = "SZ" // 深证证券交易所
  74. )
  75. // 上交所指数
  76. const (
  77. SHIND1 = "000"
  78. SHIND2 = "H"
  79. )
  80. // 上交所股票
  81. const (
  82. SHA = "60" // 沪市A股
  83. SHB = "900" // 沪市B股
  84. )
  85. // 上交所基金
  86. const (
  87. SHF1 = "50" // 封闭式基金
  88. SHF2 = "510" // ETF
  89. SHF3 = "519" // 实时申赎货币基金
  90. SHF4 = "511" // 交易型货币基金
  91. )
  92. // 上交所债券
  93. const (
  94. SHB1 = "010" // 国债
  95. SHB2 = "130" // 地方政府债券
  96. SHB3 = "12" // 企业债券
  97. SHB4 = "11" // 可转换公司债券
  98. SHB5 = "20" // 债券回购
  99. // SHB6 = "126" // 分离债
  100. // SHB7 = "121" // 资产支持证券
  101. )
  102. var SHSecurites = map[string]string{
  103. SHIND1: "上交所指数",
  104. SHIND2: "上交所指数",
  105. SHA: "沪市A股",
  106. SHB: "沪市B股",
  107. SHF1: "上交所基金",
  108. SHF2: "上交所基金ETF",
  109. SHF3: "上交所基金",
  110. SHF4: "上交所基金",
  111. SHB1: "上交所债券",
  112. SHB2: "上交所债券",
  113. SHB3: "上交所债券",
  114. SHB4: "上交所债券",
  115. SHB5: "上交所债券",
  116. // SHB6,
  117. // SHB7,
  118. }
  119. // 3 9 xxxx 综合指数/成份指数
  120. // 深交所指数
  121. const (
  122. SZIND = "399"
  123. )
  124. // 0 0 xxxx A股证券3 xxxx A股A2权证7 xxxx A股增发8 xxxx A股A1权证9 xxxx A股转配
  125. // 3 0 xxxx 创业板证券7 xxxx 创业板增发8 xxxx 创业板权证
  126. // 深交所股票
  127. const (
  128. SZA = "000" // 深市A股
  129. SZB = "200" // 深市B股
  130. SME = "002" // 中小板
  131. GEM = "30" // 创业板
  132. )
  133. // 1 7 xxxx 原有投资基金8 xxxx 证券投资基金
  134. // 深交所基金
  135. const (
  136. SZF1 = "150" //
  137. SZF2 = "159" // ETF
  138. SZF3 = "16" //
  139. SZF5 = "18" //
  140. )
  141. // 1 0 xxxx 国债现货1 xxxx 债券2 xxxx 可转换债券3 xxxx 国债回购
  142. // 深交所债券
  143. const (
  144. SZB1 = "10"
  145. SZB2 = "11"
  146. SZB3 = "12"
  147. SZB4 = "13"
  148. )
  149. var SZSecurites = map[string]string{
  150. SZIND: "深交所指数",
  151. SZA: "深市A股",
  152. SZB: "深市B股",
  153. SME: "中小板",
  154. GEM: "创业板",
  155. SZF1: "深交所基金",
  156. SZF2: "深交所基金ETF",
  157. SZF3: "深交所基金",
  158. SZF5: "深交所基金",
  159. SZB1: "深交所债券",
  160. SZB2: "深交所债券",
  161. SZB3: "深交所债券",
  162. SZB4: "深交所债券",
  163. }
  164. const (
  165. Custom = "custom"
  166. Forex = "forex"
  167. Futures = "futures"
  168. Securities = "securities"
  169. Btcs = "btcs"
  170. )
  171. var TypeMap = map[string]string{
  172. Lmax: "外汇",
  173. Ctp: "期货",
  174. Dzh: "证券",
  175. Btcs: "虚拟币",
  176. Saxo: "盛宝",
  177. }
  178. type Instrument struct {
  179. Id string `json:"insId"` // ID = 前缀+原始ID
  180. Name string `json:"name"` // 名称
  181. Typ string `json:"type"` // 用来区分种类
  182. ExId string `json:"exid"` // 交易所ID
  183. PriceInc float64 `json:"priceInc"` // 最小加价
  184. Margin float64 `json:"margin"` // 保证金
  185. StartTime int64 `json:"st"` // 上市时间
  186. EndTime int64 `json:"et"` // 下市时间
  187. mu sync.Mutex
  188. mk *Market
  189. mkPublisher event.EventPublisher // mk数据事件
  190. }
  191. func (ins *Instrument) OnMarket() *event.Event {
  192. return ins.mkPublisher.Event()
  193. }
  194. func (ins *Instrument) SetMk(mk *Market) {
  195. ins.mu.Lock()
  196. ins.mk = mk
  197. ins.mu.Unlock()
  198. mk.SetIns(ins)
  199. ins.mkPublisher.Publish(mk) //异步
  200. }
  201. func (ins *Instrument) GetMk() *Market {
  202. ins.mu.Lock()
  203. if ins.mk == nil {
  204. ins.mk = &Market{
  205. ins: ins,
  206. InsId: ins.Id,
  207. }
  208. }
  209. mk := *ins.mk
  210. ins.mu.Unlock()
  211. return &mk
  212. }
  213. func (ins *Instrument) FmtPrice(f float64) string {
  214. if f == math.MaxFloat64 || math.IsInf(f, 0) || math.IsNaN(f) {
  215. return "-"
  216. }
  217. pt := ins.PriceInc
  218. if pt < 0.0000001 {
  219. return fmt.Sprintf("%.8f", f)
  220. } else if pt < 0.000001 {
  221. return fmt.Sprintf("%.7f", f)
  222. } else if pt < 0.00001 {
  223. return fmt.Sprintf("%.6f", f)
  224. } else if pt < 0.0001 {
  225. return fmt.Sprintf("%.5f", f)
  226. } else if pt < 0.001 {
  227. return fmt.Sprintf("%.4f", f)
  228. } else if pt < 0.01 {
  229. return fmt.Sprintf("%.3f", f)
  230. } else if pt < 0.1 {
  231. return fmt.Sprintf("%.2f", f)
  232. } else if pt < 1. {
  233. return fmt.Sprintf("%.1f", f)
  234. } else {
  235. return fmt.Sprintf("%d", int(f))
  236. }
  237. }
  238. func InsIdPrefix(insId string) string {
  239. return strings.Split(insId, "_")[0]
  240. }
  241. func RealInsId(insId string) string {
  242. return strings.Split(insId, "_")[1]
  243. }
  244. // 实时数据订阅参数
  245. type SubArgs struct {
  246. InsId string `json:"insId"` // 产品Id
  247. Code int64 `json:"code"` // 客户端代码
  248. IsCancel bool `json:"cancel"` // 是否取消订阅
  249. }
  250. type PP [2]float64 // [0]为价格, [1]为数量
  251. type Tick struct {
  252. // InsId string `json:"insId"` // 产品ID
  253. Timestamp int64 `json:"ts"` // 时间戳
  254. Price float64 `json:"last"` // 最新价
  255. Volume float64 `json:"volume"` // 本次成交量(增量)
  256. Bid PP `json:"bids"` // 申买
  257. Ask PP `json:"asks"` // 申卖
  258. }
  259. // 市场深度
  260. type Depth struct {
  261. Bids []PP `json:"bids"` // 申买
  262. Asks []PP `json:"asks"` // 申卖
  263. }
  264. // 实时行情数据
  265. type Market struct {
  266. InsId string `json:"insId"` // 产品ID
  267. Timestamp int64 `json:"ts"` // 时间戳
  268. Close float64 `json:"close"` // 昨日收盘价
  269. Open float64 `json:"open"` // 今日开盘价
  270. High float64 `json:"high"` // 当日最高价
  271. Low float64 `json:"low"` // 当日最低价
  272. AllVolume float64 `json:"allVolume"` // 当日成交量
  273. AllAmount float64 `json:"allAmount"` // 成交额
  274. LastPrice float64 `json:"last"` // 最新价
  275. Volume float64 `json:"volume"` // 本次成交量(增量)
  276. Bids []PP `json:"bids"` // 申买
  277. Asks []PP `json:"asks"` // 申卖
  278. ins *Instrument
  279. }
  280. func (m *Market) Ins() *Instrument {
  281. return m.ins
  282. }
  283. func (m *Market) SetIns(ins *Instrument) {
  284. m.ins = ins
  285. }
  286. func Market2TickByBid(m *Market) *Tick {
  287. t := Market2Tick(m)
  288. if len(m.Bids) > 0 {
  289. t.Price = m.Bids[0][0]
  290. t.Volume = m.Bids[0][1]
  291. }
  292. return t
  293. }
  294. func Market2Tick(m *Market) *Tick {
  295. t := &Tick{
  296. Timestamp: m.Timestamp,
  297. Price: m.LastPrice,
  298. Volume: m.Volume,
  299. }
  300. if len(m.Asks) > 0 {
  301. t.Ask = m.Asks[0]
  302. }
  303. if len(m.Bids) > 0 {
  304. t.Bid = m.Bids[0]
  305. }
  306. return t
  307. }
  308. func Market2Depth(mk *Market) *Depth {
  309. return &Depth{
  310. mk.Bids,
  311. mk.Asks,
  312. }
  313. }
  314. func WriteBinary(w io.Writer, v interface{}) error {
  315. return binary.Write(w, binary.LittleEndian, v)
  316. }
  317. func WriteTickBinary(w io.Writer, t *Tick) error {
  318. return binary.Write(w, binary.LittleEndian, t)
  319. }
  320. func ReadTickBinary(r io.Reader) (*Tick, error) {
  321. t := &Tick{}
  322. err := binary.Read(r, binary.LittleEndian, t)
  323. if err != nil {
  324. return nil, err
  325. }
  326. return t, nil
  327. }
  328. type Candle struct {
  329. Timestamp int64 `json:"ts"`
  330. Open float64 `json:"open"`
  331. High float64 `json:"high"`
  332. Low float64 `json:"low"`
  333. Close float64 `json:"close"`
  334. RealVolums float64 `json:"realVol"`
  335. TickVolums float64 `json:"tickVol"`
  336. }
  337. func WriteCandleBinary(w io.Writer, c *Candle) error {
  338. return binary.Write(w, binary.LittleEndian, c)
  339. }
  340. func ReadCandleBinary(r io.Reader) (*Candle, error) {
  341. c := &Candle{}
  342. err := binary.Read(r, binary.LittleEndian, c)
  343. if err != nil {
  344. return nil, err
  345. }
  346. return c, nil
  347. }
  348. type CandleBuf struct {
  349. sync.Mutex
  350. Buf []Candle
  351. }
  352. type tickBuf struct {
  353. sync.Mutex
  354. buf []Tick
  355. }
  356. type SearchArgs struct {
  357. N int
  358. TS int64
  359. }
  360. func (buf *tickBuf) add(t *Tick) bool {
  361. buf.Lock()
  362. defer buf.Unlock()
  363. buf.buf = append(buf.buf, *t)
  364. if len(buf.buf) >= 2000 {
  365. return true
  366. } else {
  367. return false
  368. }
  369. }
  370. func (buf *CandleBuf) add(c *Candle, period int) {
  371. buf.Lock()
  372. defer buf.Unlock()
  373. bufLen := 1000
  374. if period == M1 {
  375. bufLen = 1500
  376. }
  377. if len(buf.Buf) < bufLen {
  378. buf.Buf = append(buf.Buf, *c)
  379. } else {
  380. buf.Buf = buf.Buf[1:]
  381. buf.Buf = append(buf.Buf, *c)
  382. }
  383. }
  384. func (buf *tickBuf) leng() int {
  385. buf.Lock()
  386. defer buf.Unlock()
  387. return len(buf.buf)
  388. }
  389. func (buf *CandleBuf) leng() int {
  390. buf.Lock()
  391. defer buf.Unlock()
  392. return len(buf.Buf)
  393. }
  394. func (buf *CandleBuf) Last() *Candle {
  395. buf.Lock()
  396. defer buf.Unlock()
  397. if len(buf.Buf) == 0 {
  398. return nil
  399. }
  400. return &buf.Buf[len(buf.Buf)-1]
  401. }
  402. func (buf *CandleBuf) at(i int) *Candle {
  403. buf.Lock()
  404. defer buf.Unlock()
  405. if len(buf.Buf)-1 < i {
  406. return nil
  407. }
  408. c := buf.Buf[i]
  409. return &c
  410. }
  411. func (buf *CandleBuf) Search(args *SearchArgs) ([]Candle, error) {
  412. buf.Lock()
  413. defer buf.Unlock()
  414. p := len(buf.Buf)
  415. if p == 0 {
  416. return nil, ErrNotEnough
  417. }
  418. if args.TS == TimeNow {
  419. p -= 1
  420. } else {
  421. p = sort.Search(len(buf.Buf), func(i int) bool {
  422. c := buf.Buf[i]
  423. if c.Timestamp >= args.TS {
  424. return true
  425. }
  426. return false
  427. })
  428. }
  429. if p != len(buf.Buf) { // 在缓存中
  430. n := args.N
  431. if n < 0 {
  432. n = -n
  433. if p >= n-1 {
  434. return buf.Buf[p-n+1 : p+1], nil
  435. }
  436. // 缓存中不够
  437. args.N = p - n + 1
  438. args.TS = buf.Buf[0].Timestamp
  439. return buf.Buf[:p], ErrNotEnough
  440. }
  441. if len(buf.Buf)-p > n {
  442. return buf.Buf[p : p+n], nil
  443. }
  444. p := len(buf.Buf) - n
  445. if p < 0 {
  446. p = 0
  447. }
  448. return buf.Buf[p:], nil
  449. }
  450. return nil, ErrNotEnough
  451. }
  452. func getTime(ts int64) time.Time {
  453. if ts < 0 {
  454. return time.Now()
  455. }
  456. return time.Unix(ts/1000, (ts%1000)*1e6)
  457. }
  458. func GetTime(ts int64) time.Time {
  459. return getTime(ts)
  460. }
  461. var ErrNoDataBefore = errors.New("No Data before")
  462. func ZipBuf(v interface{}) ([]byte, error) {
  463. return zipBuf(v)
  464. }
  465. func zipBuf(v interface{}) ([]byte, error) {
  466. buf := &bytes.Buffer{}
  467. candles, ok := v.([]Candle)
  468. if ok {
  469. err := zipCBuf(buf, candles)
  470. if err != nil {
  471. return nil, err
  472. }
  473. return buf.Bytes(), nil
  474. }
  475. ticks, ok := v.([]Tick)
  476. if ok {
  477. err := zipTBuf(buf, ticks)
  478. if err != nil {
  479. return nil, err
  480. }
  481. return buf.Bytes(), nil
  482. }
  483. return nil, errors.New("zipBuf error: paramter v is NOT []Tick or []Candle")
  484. }
  485. func UnzipBufT(b []byte) ([]Tick, error) {
  486. return unzipBufT(b)
  487. }
  488. func unzipBufT(b []byte) ([]Tick, error) {
  489. r := bytes.NewReader(b)
  490. return unzipT(r)
  491. }
  492. func UnzipBufC(b []byte) ([]Candle, error) {
  493. return unzipBufC(b)
  494. }
  495. func unzipBufC(b []byte) ([]Candle, error) {
  496. r := bytes.NewReader(b)
  497. return unzipC(r)
  498. }
  499. func ZipTBuf(w io.Writer, ticks []Tick) error {
  500. return zipTBuf(w, ticks)
  501. }
  502. func zipTBuf(w io.Writer, ticks []Tick) error {
  503. gw := gzip.NewWriter(w)
  504. defer gw.Close()
  505. for _, t := range ticks {
  506. err := WriteTickBinary(gw, &t)
  507. if err != nil {
  508. return err
  509. }
  510. }
  511. return gw.Flush()
  512. }
  513. func ZipCBuf(w io.Writer, candles []Candle) error {
  514. return zipCBuf(w, candles)
  515. }
  516. func zipCBuf(w io.Writer, candles []Candle) error {
  517. gw := gzip.NewWriter(w)
  518. defer gw.Close()
  519. for _, c := range candles {
  520. err := WriteCandleBinary(gw, &c)
  521. if err != nil {
  522. return err
  523. }
  524. }
  525. return gw.Flush()
  526. }
  527. func UnzipC(r io.Reader) ([]Candle, error) {
  528. return unzipC(r)
  529. }
  530. func readR(r io.Reader) ([]Candle, error) {
  531. candles := []Candle{}
  532. for {
  533. c, err := ReadCandleBinary(r)
  534. if err != nil {
  535. if err != io.EOF {
  536. return candles, err
  537. }
  538. break
  539. }
  540. if math.IsNaN(c.Open*2) || math.IsInf(c.Open*2, 0) {
  541. continue
  542. }
  543. if math.IsNaN(c.High*2) || math.IsInf(c.High*2, 0) {
  544. continue
  545. }
  546. if math.IsNaN(c.Low*2) || math.IsInf(c.Low*2, 0) {
  547. continue
  548. }
  549. if math.IsNaN(c.Close*2) || math.IsInf(c.Close*2, 0) {
  550. continue
  551. }
  552. candles = append(candles, *c)
  553. }
  554. return candles, nil
  555. }
  556. func unzipC(r io.Reader) ([]Candle, error) {
  557. gr, err := gzip.NewReader(r)
  558. if err != nil {
  559. return nil, err
  560. }
  561. defer gr.Close()
  562. candles := []Candle{}
  563. for {
  564. c, err := ReadCandleBinary(gr)
  565. if err != nil {
  566. if err != io.EOF {
  567. return candles, err
  568. }
  569. break
  570. }
  571. if math.IsNaN(c.Open*2) || math.IsInf(c.Open*2, 0) {
  572. continue
  573. }
  574. if math.IsNaN(c.High*2) || math.IsInf(c.High*2, 0) {
  575. continue
  576. }
  577. if math.IsNaN(c.Low*2) || math.IsInf(c.Low*2, 0) {
  578. continue
  579. }
  580. if math.IsNaN(c.Close*2) || math.IsInf(c.Close*2, 0) {
  581. continue
  582. }
  583. candles = append(candles, *c)
  584. }
  585. return candles, nil
  586. }
  587. func UnzipT(r io.Reader) ([]Tick, error) {
  588. return unzipT(r)
  589. }
  590. func unzipT(r io.Reader) ([]Tick, error) {
  591. gr, err := gzip.NewReader(r)
  592. if err != nil {
  593. return nil, err
  594. }
  595. defer gr.Close()
  596. ticks := []Tick{}
  597. for {
  598. t, err := ReadTickBinary(gr)
  599. if err != nil {
  600. if err != io.EOF {
  601. return nil, err
  602. }
  603. break
  604. }
  605. if math.IsNaN(t.Price) || math.IsInf(t.Price*2, 0) {
  606. continue
  607. }
  608. ticks = append(ticks, *t)
  609. }
  610. return ticks, nil
  611. }
  612. var ErrNotEnough = errors.New("Not enough data")
  613. func readTickFile(fname string) ([]Tick, error) {
  614. f, err := os.Open(fname)
  615. if err != nil {
  616. log.Fatal(err.Error() + fname)
  617. return nil, err
  618. }
  619. defer f.Close()
  620. return unzipT(f)
  621. }
  622. func ReadNoZipCandleFile(fname string) ([]Candle, error) {
  623. f, err := os.Open(fname)
  624. if err != nil {
  625. return nil, err
  626. }
  627. defer f.Close()
  628. return readR(f)
  629. }
  630. func readCandleFile(fname string) ([]Candle, error) {
  631. f, err := os.Open(fname)
  632. if err != nil {
  633. return nil, err
  634. }
  635. defer f.Close()
  636. return unzipC(f)
  637. }
  638. func ReadCandleFile(fname string) ([]Candle, error) {
  639. return readCandleFile(fname)
  640. }
  641. func ReadTickFile(fname string) ([]Tick, error) {
  642. return readTickFile(fname)
  643. }
  644. func SaveTickEx(dataDir string, ts []Tick, insId string, bTruncate bool) (string, error) {
  645. if len(ts) == 0 {
  646. return "", errors.New("len(ts) == 0")
  647. }
  648. t := time.Unix(ts[0].Timestamp/1000, 0)
  649. dir := path.Join(dataDir, insId, fmt.Sprint(t.Year()))
  650. os.MkdirAll(dir, 0777)
  651. fname := path.Join(dir, fmt.Sprintf("%04d%02d%02d.tk.gz", t.Year(), t.Month(), t.Day()))
  652. var w io.WriteCloser
  653. var err error
  654. if bTruncate {
  655. w, err = os.Create(fname)
  656. } else {
  657. w, err = os.OpenFile(fname, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
  658. }
  659. if err != nil {
  660. return "", errors.New("SaveTicks os.Create error:" + err.Error())
  661. }
  662. defer w.Close()
  663. gw := gzip.NewWriter(w)
  664. bw := bufio.NewWriter(gw)
  665. for _, v := range ts {
  666. binary.Write(bw, binary.LittleEndian, v)
  667. }
  668. bw.Flush()
  669. gw.Close()
  670. return fname, nil
  671. }
  672. // 把之前文件中candles和新的合并
  673. func combinEx(filename string, candles []Candle) ([]Candle, error) {
  674. buf, err := ReadCandleFile(filename)
  675. if err != nil {
  676. return candles, err
  677. }
  678. //
  679. n := len(buf)
  680. if candles[0].Timestamp == buf[n-1].Timestamp {
  681. buf[n-1].High = max(buf[n-1].High, candles[0].High)
  682. buf[n-1].Low = min(buf[n-1].Low, candles[0].Low)
  683. buf[n-1].Close = candles[0].Close
  684. buf[n-1].RealVolums += candles[0].RealVolums
  685. buf[n-1].TickVolums += candles[0].TickVolums
  686. candles = append(buf, candles[1:]...)
  687. } else {
  688. candles = append(buf, candles...)
  689. }
  690. return candles, nil
  691. }