pconv.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635
  1. // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved.
  2. package market
  3. import (
  4. "errors"
  5. "fmt"
  6. "io"
  7. //"log"
  8. "os"
  9. "strings"
  10. "time"
  11. "unsafe"
  12. "tickserver/framework/base"
  13. )
  14. // pconv.go 实现周期的转换, 包括tick到任意周期, 以及低周期向高周期的转换
  15. // 周期定义
  16. const (
  17. TK = 0
  18. S1 = 1
  19. S2 = 2
  20. S3 = 3
  21. S4 = 4
  22. S5 = 5
  23. S15 = 15
  24. S30 = 30
  25. M1 = 1 * 60
  26. M2 = 2 * 60
  27. M3 = 3 * 60
  28. M4 = 4 * 60
  29. M5 = 5 * 60
  30. M15 = 15 * 60
  31. M30 = 30 * 60
  32. H1 = 60 * 60
  33. H2 = 2 * 60 * 60
  34. H4 = 4 * 60 * 60
  35. D1 = 24 * 3600
  36. W1 = 7 * 24 * 3600
  37. MN1 = 30 * 24 * 3600
  38. )
  39. var PeriodNameMap = map[int]string{
  40. TK: "Tick",
  41. S1: "S1",
  42. S2: "S2",
  43. S3: "S3",
  44. S4: "S4",
  45. S5: "S5",
  46. S15: "S15",
  47. S30: "S30",
  48. M1: "M1",
  49. M2: "M2",
  50. M3: "M3",
  51. M4: "M4",
  52. M5: "M5",
  53. M15: "M15",
  54. M30: "M30",
  55. H1: "H1",
  56. H2: "H2",
  57. H4: "H4",
  58. D1: "D1",
  59. W1: "W1",
  60. MN1: "MN1",
  61. }
  62. var PeriodIdMap = map[string]int{
  63. "Tick": TK,
  64. "S1": S1,
  65. "S2": S2,
  66. "S3": S3,
  67. "S4": S4,
  68. "S5": S5,
  69. "S15": S15,
  70. "S30": S30,
  71. "M1": M1,
  72. "M2": M2,
  73. "M3": M3,
  74. "M4": M4,
  75. "M5": M5,
  76. "M15": M15,
  77. "M30": M30,
  78. "H1": H1,
  79. "H2": H2,
  80. "H4": H4,
  81. "D1": D1,
  82. "W1": W1,
  83. "MN1": MN1,
  84. }
  85. // Tick到周期的转换
  86. type TickReader interface {
  87. Read() (*Tick, error)
  88. }
  89. type CandleReader interface {
  90. Read() (*Candle, error)
  91. }
  92. type TickBuffer struct {
  93. ch chan *Tick
  94. }
  95. func NewTickBuffer(ch chan *Tick) *TickBuffer {
  96. return &TickBuffer{ch}
  97. }
  98. func (buf *TickBuffer) Write(t *Tick) {
  99. buf.ch <- t
  100. }
  101. func (buf *TickBuffer) Read() (*Tick, error) {
  102. t := <-buf.ch
  103. if t == nil {
  104. return nil, errReadEOF
  105. }
  106. return t, nil
  107. }
  108. type candleAndErr struct {
  109. candle *Candle
  110. err error
  111. }
  112. type candleBuffer struct {
  113. ch chan candleAndErr
  114. }
  115. func (buf *candleBuffer) Read() (*Candle, error) {
  116. ce := <-buf.ch
  117. return ce.candle, ce.err
  118. }
  119. func (buf *candleBuffer) write(candle *Candle, err error) {
  120. buf.ch <- candleAndErr{candle, err}
  121. }
  122. func max(p1, p2 float64) float64 {
  123. if p1 < p2 {
  124. return p2
  125. }
  126. return p1
  127. }
  128. func min(p1, p2 float64) float64 {
  129. if p1 > p2 {
  130. return p2
  131. }
  132. return p1
  133. }
  134. type tickConver struct {
  135. r TickReader
  136. buf *candleBuffer
  137. period int
  138. c *Candle
  139. cg *base.Candle
  140. ohlc base.Ohlc
  141. }
  142. // 用来转换周期
  143. func convTS(ts int64, period int) int64 {
  144. ts = (3600000*8 + ts) / (int64(period) * 1000) * (int64(period) * 1000) // 时间按照周期取整
  145. ts -= 3600000 * 8
  146. return ts
  147. }
  148. func (tc *tickConver) convEx(t *Tick) []*Candle {
  149. tg := Tk2Tg(*t)
  150. num := tc.cg.UpdateTick((*base.Tick)(unsafe.Pointer(&tg)))
  151. var tmpcandles []*Candle
  152. //ohlc := base.Ohlc{}
  153. if num == 0 {
  154. tc.cg.Next(&tc.ohlc)
  155. ohlcGo := tc.ohlc.ToGOStruct()
  156. tmpcandle := OhlcGo2Candle(ohlcGo)
  157. tmpcandles = append(tmpcandles, &tmpcandle)
  158. } else if num > 0 {
  159. for i := 0; i < num; i++ {
  160. tc.cg.Next(&tc.ohlc)
  161. ohlcGo := tc.ohlc.ToGOStruct()
  162. tmpcandle := OhlcGo2Candle(ohlcGo)
  163. tmpcandles = append(tmpcandles, &tmpcandle)
  164. }
  165. } else {
  166. //log.Println("tick error.")
  167. }
  168. var candles []*Candle
  169. for _, tmpcandle := range tmpcandles {
  170. if len(candles) > 0 && candles[len(candles)-1].Timestamp == tmpcandle.Timestamp {
  171. candles[len(candles)-1] = tmpcandle
  172. } else {
  173. candles = append(candles, tmpcandle)
  174. }
  175. }
  176. return candles
  177. }
  178. /*func (tc *tickConver) conv(t *Tick) *Candle {
  179. price := t.Price
  180. volumns := t.Volume
  181. c := tc.c
  182. ts := convTS(t.Timestamp, tc.period)
  183. if c == nil || c.Timestamp < ts { // 产生新的K线的条件
  184. c = &Candle{
  185. Open: price,
  186. High: price,
  187. Low: price,
  188. Close: price,
  189. RealVolums: t.Volume,
  190. TickVolums: 1,
  191. }
  192. tc.c = c
  193. } else if c.Timestamp == ts {
  194. c.High = max(c.High, price)
  195. c.Low = min(c.Low, price)
  196. c.Close = price
  197. c.TickVolums += 1
  198. c.RealVolums += volumns
  199. } else {
  200. return nil
  201. }
  202. c.Timestamp = ts
  203. return c
  204. }*/
  205. func (tc *tickConver) doConv() {
  206. for {
  207. t, err := tc.r.Read()
  208. if err != nil {
  209. // log.Println("doConv error:", err)
  210. ohlcGo := tc.ohlc.ToGOStruct()
  211. if ohlcGo.Time != 0 {
  212. candle := OhlcGo2Candle(ohlcGo)
  213. tc.buf.write(&candle, nil)
  214. }
  215. tc.buf.write(nil, errReadEOF)
  216. return
  217. }
  218. if t.Price == 0 {
  219. continue
  220. }
  221. c := tc.convEx(t)
  222. if tc.period == M1 {
  223. //testCandle(1, nil, t, c, true)
  224. }
  225. for _, v := range c {
  226. if v != nil {
  227. candle := *v // copy
  228. tc.buf.write(&candle, nil)
  229. }
  230. }
  231. }
  232. }
  233. func TickConv(r TickReader, insId string, period int, c *Candle) CandleReader {
  234. return tickConv(r, insId, period, c)
  235. }
  236. func tickConv(r TickReader, insId string, period int, c *Candle) CandleReader {
  237. buf := &candleBuffer{ch: make(chan candleAndErr)}
  238. cg, _ := base.NewCandle(period, 2, nil, 0)
  239. if strings.HasPrefix(insId, Ctp) {
  240. cg.Set(base.CANDLE_AUTOCOMPLETE_MAX, 1)
  241. }
  242. tc := &tickConver{
  243. r: r,
  244. period: period,
  245. buf: buf,
  246. c: c,
  247. cg: cg,
  248. }
  249. go tc.doConv()
  250. return buf
  251. }
  252. type tconver struct {
  253. ch chan *Tick
  254. c *Candle
  255. cg *base.Candle
  256. ohlc base.Ohlc
  257. insId string
  258. period int
  259. }
  260. func Tk2Tg(tk Tick) base.TickGo {
  261. var tg base.TickGo
  262. tg.Time = int32(tk.Timestamp / 1000)
  263. tg.Ms = int16(tk.Timestamp % 1000)
  264. tg.Symbol = 0
  265. tg.Bid = float32(tk.Price) //tk.Bid[0]
  266. //tg.Ask = float32(tk.Price) //tk.Ask[0]
  267. tg.Bidv = float32(tk.Volume) //tk.Bid[1]
  268. //tg.Askv = int32(tk.Volume) //tk.Ask[1]
  269. return tg
  270. }
  271. func OhlcGo2Candle(ohlcGo base.OhlcGo) Candle {
  272. var c Candle
  273. c.Timestamp = int64(ohlcGo.Time) * 1000
  274. c.Open = ohlcGo.Open
  275. c.High = ohlcGo.High
  276. c.Low = ohlcGo.Low
  277. c.Close = ohlcGo.Close
  278. c.TickVolums = float64(ohlcGo.TickVolumn)
  279. c.RealVolums = float64(ohlcGo.RealVolumn)
  280. return c
  281. }
  282. func Candle2OhlcGo(c Candle) base.OhlcGo {
  283. var ohlcGo base.OhlcGo
  284. ohlcGo.Time = int32(c.Timestamp / 1000)
  285. ohlcGo.Open = c.Open
  286. ohlcGo.High = c.High
  287. ohlcGo.Low = c.Low
  288. ohlcGo.Close = c.Close
  289. ohlcGo.TickVolumn = int64(c.TickVolums)
  290. ohlcGo.RealVolumn = c.RealVolums
  291. return ohlcGo
  292. }
  293. func (tc *tconver) convEx(t *Tick) []*Candle {
  294. tg := Tk2Tg(*t)
  295. num := tc.cg.UpdateTick((*base.Tick)(unsafe.Pointer(&tg)))
  296. var tmpcandles []*Candle
  297. //ohlc := base.Ohlc{}
  298. if num == 0 {
  299. tc.cg.Next(&tc.ohlc)
  300. ohlcGo := tc.ohlc.ToGOStruct()
  301. tmpcandle := OhlcGo2Candle(ohlcGo)
  302. tmpcandles = append(tmpcandles, &tmpcandle)
  303. } else if num > 0 {
  304. for i := 0; i < num; i++ {
  305. tc.cg.Next(&tc.ohlc)
  306. ohlcGo := tc.ohlc.ToGOStruct()
  307. tmpcandle := OhlcGo2Candle(ohlcGo)
  308. tmpcandles = append(tmpcandles, &tmpcandle)
  309. }
  310. } else {
  311. //log.Println("tick error.")
  312. }
  313. var candles []*Candle
  314. for _, tmpcandle := range tmpcandles {
  315. if len(candles) > 0 && candles[len(candles)-1].Timestamp == tmpcandle.Timestamp {
  316. candles[len(candles)-1] = tmpcandle
  317. } else {
  318. candles = append(candles, tmpcandle)
  319. }
  320. }
  321. return candles
  322. }
  323. /*func (tc *tconver) conv(t *Tick) *Candle {
  324. price := t.Price
  325. volumns := t.Volume
  326. c := tc.c
  327. ts := convTS(t.Timestamp, tc.period)
  328. if c == nil || c.Timestamp < ts { // 产生新的K线的条件
  329. c = &Candle{
  330. Open: price,
  331. High: price,
  332. Low: price,
  333. Close: price,
  334. RealVolums: t.Volume,
  335. TickVolums: 1,
  336. }
  337. tc.c = c
  338. } else if c.Timestamp == ts {
  339. c.High = max(c.High, price)
  340. c.Low = min(c.Low, price)
  341. c.Close = price
  342. c.TickVolums += 1
  343. c.RealVolums += volumns
  344. } else {
  345. return nil
  346. }
  347. c.Timestamp = ts
  348. return c
  349. }*/
  350. type candleConver struct {
  351. r CandleReader
  352. buf *candleBuffer
  353. period int
  354. c *Candle
  355. cg *base.Candle
  356. ohlc base.Ohlc
  357. }
  358. func (tc *candleConver) doConv() {
  359. for {
  360. c, err := tc.r.Read()
  361. if err != nil {
  362. ohlcGo := tc.ohlc.ToGOStruct()
  363. if ohlcGo.Time != 0 {
  364. candle := OhlcGo2Candle(ohlcGo)
  365. tc.buf.write(&candle, nil)
  366. }
  367. tc.buf.write(nil, errReadEOF)
  368. return
  369. }
  370. cc := tc.convEx(c)
  371. if tc.period == M1 {
  372. //testCandle(2, c, nil, cc, false)
  373. }
  374. for _, v := range cc {
  375. if v != nil {
  376. candle := *v // copy
  377. tc.buf.write(&candle, nil)
  378. }
  379. }
  380. }
  381. }
  382. var testCandleFile [6]*os.File
  383. func testCandle(testIndex int, candle *Candle, tick *Tick, candles []*Candle, tOrc bool) {
  384. if testCandleFile[2*testIndex] == nil && testCandleFile[2*testIndex+1] == nil {
  385. fName := fmt.Sprintf("%d", testIndex)
  386. originalFileName := fName + "_original.txt"
  387. candleFileName := fName + "_candle.txt"
  388. testCandleFile[2*testIndex], _ = os.Create(originalFileName)
  389. testCandleFile[2*testIndex+1], _ = os.Create(candleFileName)
  390. }
  391. if tOrc {
  392. testTime := time.Unix(tick.Timestamp/1000, 0)
  393. fmt.Fprintln(testCandleFile[2*testIndex], testTime, *tick)
  394. } else {
  395. testTime := time.Unix(candle.Timestamp/1000, 0)
  396. fmt.Fprintln(testCandleFile[2*testIndex], testTime, *candle)
  397. }
  398. for _, v := range candles {
  399. testTime := time.Unix(v.Timestamp/1000, 0)
  400. fmt.Fprintln(testCandleFile[2*testIndex+1], testTime, *v)
  401. }
  402. }
  403. func (tc *candleConver) convEx(candle *Candle) []*Candle {
  404. ohlcGo := Candle2OhlcGo(*candle)
  405. num := tc.cg.UpdateOhlc(ohlcGo.ToCStruct())
  406. var candles []*Candle
  407. //ohlc := base.Ohlc{}
  408. if num == 0 {
  409. tc.cg.Next(&tc.ohlc)
  410. //ohlcGo := tc.ohlc.ToGOStruct()
  411. //candle := OhlcGo2Candle(ohlcGo)
  412. //candles = append(candles, &candle)
  413. } else if num > 0 {
  414. for i := 0; i < num; i++ {
  415. ohlcGo := tc.ohlc.ToGOStruct()
  416. if ohlcGo.Time != 0 {
  417. candle := OhlcGo2Candle(ohlcGo)
  418. candles = append(candles, &candle)
  419. }
  420. tc.cg.Next(&tc.ohlc)
  421. }
  422. } else {
  423. //log.Println("tick error.")
  424. }
  425. return candles
  426. }
  427. /*func (tc *candleConver) conv(candle *Candle) *Candle {
  428. c := tc.c
  429. ts := convTS(candle.Timestamp, tc.period)
  430. if c == nil || c.Timestamp < ts { // new candle
  431. c = &Candle{}
  432. *c = *candle
  433. tc.c = c
  434. } else if c.Timestamp == ts { // same period
  435. c.High = max(c.High, candle.High)
  436. c.Low = min(c.Low, candle.Low)
  437. c.Close = candle.Close
  438. c.TickVolums += candle.TickVolums
  439. c.RealVolums += candle.RealVolums
  440. } else {
  441. return nil
  442. }
  443. c.Timestamp = ts
  444. return c
  445. }*/
  446. func CandleConv(r CandleReader, insId string, period int) CandleReader {
  447. buf := &candleBuffer{ch: make(chan candleAndErr)}
  448. cg, _ := base.NewCandle(period, 2, nil, 0)
  449. if strings.HasPrefix(insId, Ctp) {
  450. cg.Set(base.CANDLE_AUTOCOMPLETE_MAX, 1)
  451. }
  452. cc := &candleConver{
  453. r: r,
  454. period: period,
  455. buf: buf,
  456. cg: cg,
  457. }
  458. go cc.doConv()
  459. return buf
  460. }
  461. func TickConvCandle(r TickReader, insId string, period int) ([]Candle, error) {
  462. return tickConvCandle(r, insId, period)
  463. }
  464. func tickConvCandle(r TickReader, insId string, period int) ([]Candle, error) {
  465. reader := tickConv(r, insId, period, nil)
  466. var buf []Candle
  467. var prev *Candle
  468. for {
  469. candle, err := reader.Read()
  470. if err == errReadEOF {
  471. // 添加最后的Candle
  472. if prev != nil {
  473. buf = append(buf, *prev)
  474. }
  475. break
  476. }
  477. if err != nil {
  478. break
  479. }
  480. if prev == nil {
  481. prev = candle
  482. }
  483. if prev.Timestamp != candle.Timestamp { //产生新的K线
  484. buf = append(buf, *prev)
  485. }
  486. prev = candle
  487. }
  488. return buf, nil
  489. }
  490. func ConvPeriod(r CandleReader, insId string, period int) ([]Candle, error) {
  491. return convPeriod(r, insId, period)
  492. }
  493. func convPeriod(r CandleReader, insId string, period int) ([]Candle, error) {
  494. reader := CandleConv(r, insId, period /*, UseBid, true*/)
  495. var buf []Candle
  496. var prev *Candle
  497. for {
  498. candle, err := reader.Read()
  499. if err == errReadEOF {
  500. // 添加最后的Candle
  501. if prev != nil {
  502. buf = append(buf, *prev)
  503. }
  504. break
  505. }
  506. if err != nil {
  507. break
  508. }
  509. if prev == nil {
  510. prev = candle
  511. }
  512. if prev.Timestamp != candle.Timestamp { //产生新的K线
  513. buf = append(buf, *prev)
  514. }
  515. prev = candle
  516. }
  517. return buf, nil
  518. }
  519. type TickBuf struct {
  520. ticks []Tick // 存储tick数据
  521. rd int
  522. }
  523. func NewTickBuf(ticks []Tick) *TickBuf {
  524. return &TickBuf{ticks: ticks}
  525. }
  526. var errReadEOF = errors.New("read EOF")
  527. func (r *TickBuf) Read() (*Tick, error) {
  528. if r.rd == len(r.ticks) {
  529. return nil, errReadEOF
  530. }
  531. t := &r.ticks[r.rd]
  532. r.rd += 1
  533. return t, nil
  534. }
  535. type candleBuf struct {
  536. candles []Candle // 存储tick数据
  537. rd int
  538. }
  539. func NewCandleBuf(candles []Candle) *candleBuf {
  540. return &candleBuf{candles: candles}
  541. }
  542. func (r *candleBuf) Read() (*Candle, error) {
  543. if r.rd == len(r.candles) {
  544. return nil, errReadEOF
  545. }
  546. candle := &r.candles[r.rd]
  547. r.rd += 1
  548. return candle, nil
  549. }
  550. type BinaryReader struct {
  551. R io.Reader
  552. }
  553. func (r *BinaryReader) Read() (*Candle, error) {
  554. return ReadCandleBinary(r.R)
  555. }
  556. type TickBinaryReader struct {
  557. R io.Reader
  558. }
  559. func (r *TickBinaryReader) Read() (*Tick, error) {
  560. return ReadTickBinary(r.R)
  561. }