123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635 |
- // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved.
- package market
- import (
- "errors"
- "fmt"
- "io"
- //"log"
- "os"
- "strings"
- "time"
- "unsafe"
- "tickserver/framework/base"
- )
- // pconv.go 实现周期的转换, 包括tick到任意周期, 以及低周期向高周期的转换
- // 周期定义
- const (
- TK = 0
- S1 = 1
- S2 = 2
- S3 = 3
- S4 = 4
- S5 = 5
- S15 = 15
- S30 = 30
- M1 = 1 * 60
- M2 = 2 * 60
- M3 = 3 * 60
- M4 = 4 * 60
- M5 = 5 * 60
- M15 = 15 * 60
- M30 = 30 * 60
- H1 = 60 * 60
- H2 = 2 * 60 * 60
- H4 = 4 * 60 * 60
- D1 = 24 * 3600
- W1 = 7 * 24 * 3600
- MN1 = 30 * 24 * 3600
- )
- var PeriodNameMap = map[int]string{
- TK: "Tick",
- S1: "S1",
- S2: "S2",
- S3: "S3",
- S4: "S4",
- S5: "S5",
- S15: "S15",
- S30: "S30",
- M1: "M1",
- M2: "M2",
- M3: "M3",
- M4: "M4",
- M5: "M5",
- M15: "M15",
- M30: "M30",
- H1: "H1",
- H2: "H2",
- H4: "H4",
- D1: "D1",
- W1: "W1",
- MN1: "MN1",
- }
- var PeriodIdMap = map[string]int{
- "Tick": TK,
- "S1": S1,
- "S2": S2,
- "S3": S3,
- "S4": S4,
- "S5": S5,
- "S15": S15,
- "S30": S30,
- "M1": M1,
- "M2": M2,
- "M3": M3,
- "M4": M4,
- "M5": M5,
- "M15": M15,
- "M30": M30,
- "H1": H1,
- "H2": H2,
- "H4": H4,
- "D1": D1,
- "W1": W1,
- "MN1": MN1,
- }
- // Tick到周期的转换
- type TickReader interface {
- Read() (*Tick, error)
- }
- type CandleReader interface {
- Read() (*Candle, error)
- }
- type TickBuffer struct {
- ch chan *Tick
- }
- func NewTickBuffer(ch chan *Tick) *TickBuffer {
- return &TickBuffer{ch}
- }
- func (buf *TickBuffer) Write(t *Tick) {
- buf.ch <- t
- }
- func (buf *TickBuffer) Read() (*Tick, error) {
- t := <-buf.ch
- if t == nil {
- return nil, errReadEOF
- }
- return t, nil
- }
- type candleAndErr struct {
- candle *Candle
- err error
- }
- type candleBuffer struct {
- ch chan candleAndErr
- }
- func (buf *candleBuffer) Read() (*Candle, error) {
- ce := <-buf.ch
- return ce.candle, ce.err
- }
- func (buf *candleBuffer) write(candle *Candle, err error) {
- buf.ch <- candleAndErr{candle, err}
- }
- func max(p1, p2 float64) float64 {
- if p1 < p2 {
- return p2
- }
- return p1
- }
- func min(p1, p2 float64) float64 {
- if p1 > p2 {
- return p2
- }
- return p1
- }
- type tickConver struct {
- r TickReader
- buf *candleBuffer
- period int
- c *Candle
- cg *base.Candle
- ohlc base.Ohlc
- }
- // 用来转换周期
- func convTS(ts int64, period int) int64 {
- ts = (3600000*8 + ts) / (int64(period) * 1000) * (int64(period) * 1000) // 时间按照周期取整
- ts -= 3600000 * 8
- return ts
- }
- func (tc *tickConver) convEx(t *Tick) []*Candle {
- tg := Tk2Tg(*t)
- num := tc.cg.UpdateTick((*base.Tick)(unsafe.Pointer(&tg)))
- var tmpcandles []*Candle
- //ohlc := base.Ohlc{}
- if num == 0 {
- tc.cg.Next(&tc.ohlc)
- ohlcGo := tc.ohlc.ToGOStruct()
- tmpcandle := OhlcGo2Candle(ohlcGo)
- tmpcandles = append(tmpcandles, &tmpcandle)
- } else if num > 0 {
- for i := 0; i < num; i++ {
- tc.cg.Next(&tc.ohlc)
- ohlcGo := tc.ohlc.ToGOStruct()
- tmpcandle := OhlcGo2Candle(ohlcGo)
- tmpcandles = append(tmpcandles, &tmpcandle)
- }
- } else {
- //log.Println("tick error.")
- }
- var candles []*Candle
- for _, tmpcandle := range tmpcandles {
- if len(candles) > 0 && candles[len(candles)-1].Timestamp == tmpcandle.Timestamp {
- candles[len(candles)-1] = tmpcandle
- } else {
- candles = append(candles, tmpcandle)
- }
- }
- return candles
- }
- /*func (tc *tickConver) conv(t *Tick) *Candle {
- price := t.Price
- volumns := t.Volume
- c := tc.c
- ts := convTS(t.Timestamp, tc.period)
- if c == nil || c.Timestamp < ts { // 产生新的K线的条件
- c = &Candle{
- Open: price,
- High: price,
- Low: price,
- Close: price,
- RealVolums: t.Volume,
- TickVolums: 1,
- }
- tc.c = c
- } else if c.Timestamp == ts {
- c.High = max(c.High, price)
- c.Low = min(c.Low, price)
- c.Close = price
- c.TickVolums += 1
- c.RealVolums += volumns
- } else {
- return nil
- }
- c.Timestamp = ts
- return c
- }*/
- func (tc *tickConver) doConv() {
- for {
- t, err := tc.r.Read()
- if err != nil {
- // log.Println("doConv error:", err)
- ohlcGo := tc.ohlc.ToGOStruct()
- if ohlcGo.Time != 0 {
- candle := OhlcGo2Candle(ohlcGo)
- tc.buf.write(&candle, nil)
- }
- tc.buf.write(nil, errReadEOF)
- return
- }
- if t.Price == 0 {
- continue
- }
- c := tc.convEx(t)
- if tc.period == M1 {
- //testCandle(1, nil, t, c, true)
- }
- for _, v := range c {
- if v != nil {
- candle := *v // copy
- tc.buf.write(&candle, nil)
- }
- }
- }
- }
- func TickConv(r TickReader, insId string, period int, c *Candle) CandleReader {
- return tickConv(r, insId, period, c)
- }
- func tickConv(r TickReader, insId string, period int, c *Candle) CandleReader {
- buf := &candleBuffer{ch: make(chan candleAndErr)}
- cg, _ := base.NewCandle(period, 2, nil, 0)
- if strings.HasPrefix(insId, Ctp) {
- cg.Set(base.CANDLE_AUTOCOMPLETE_MAX, 1)
- }
- tc := &tickConver{
- r: r,
- period: period,
- buf: buf,
- c: c,
- cg: cg,
- }
- go tc.doConv()
- return buf
- }
- type tconver struct {
- ch chan *Tick
- c *Candle
- cg *base.Candle
- ohlc base.Ohlc
- insId string
- period int
- }
- func Tk2Tg(tk Tick) base.TickGo {
- var tg base.TickGo
- tg.Time = int32(tk.Timestamp / 1000)
- tg.Ms = int16(tk.Timestamp % 1000)
- tg.Symbol = 0
- tg.Bid = float32(tk.Price) //tk.Bid[0]
- //tg.Ask = float32(tk.Price) //tk.Ask[0]
- tg.Bidv = float32(tk.Volume) //tk.Bid[1]
- //tg.Askv = int32(tk.Volume) //tk.Ask[1]
- return tg
- }
- func OhlcGo2Candle(ohlcGo base.OhlcGo) Candle {
- var c Candle
- c.Timestamp = int64(ohlcGo.Time) * 1000
- c.Open = ohlcGo.Open
- c.High = ohlcGo.High
- c.Low = ohlcGo.Low
- c.Close = ohlcGo.Close
- c.TickVolums = float64(ohlcGo.TickVolumn)
- c.RealVolums = float64(ohlcGo.RealVolumn)
- return c
- }
- func Candle2OhlcGo(c Candle) base.OhlcGo {
- var ohlcGo base.OhlcGo
- ohlcGo.Time = int32(c.Timestamp / 1000)
- ohlcGo.Open = c.Open
- ohlcGo.High = c.High
- ohlcGo.Low = c.Low
- ohlcGo.Close = c.Close
- ohlcGo.TickVolumn = int64(c.TickVolums)
- ohlcGo.RealVolumn = c.RealVolums
- return ohlcGo
- }
- func (tc *tconver) convEx(t *Tick) []*Candle {
- tg := Tk2Tg(*t)
- num := tc.cg.UpdateTick((*base.Tick)(unsafe.Pointer(&tg)))
- var tmpcandles []*Candle
- //ohlc := base.Ohlc{}
- if num == 0 {
- tc.cg.Next(&tc.ohlc)
- ohlcGo := tc.ohlc.ToGOStruct()
- tmpcandle := OhlcGo2Candle(ohlcGo)
- tmpcandles = append(tmpcandles, &tmpcandle)
- } else if num > 0 {
- for i := 0; i < num; i++ {
- tc.cg.Next(&tc.ohlc)
- ohlcGo := tc.ohlc.ToGOStruct()
- tmpcandle := OhlcGo2Candle(ohlcGo)
- tmpcandles = append(tmpcandles, &tmpcandle)
- }
- } else {
- //log.Println("tick error.")
- }
- var candles []*Candle
- for _, tmpcandle := range tmpcandles {
- if len(candles) > 0 && candles[len(candles)-1].Timestamp == tmpcandle.Timestamp {
- candles[len(candles)-1] = tmpcandle
- } else {
- candles = append(candles, tmpcandle)
- }
- }
- return candles
- }
- /*func (tc *tconver) conv(t *Tick) *Candle {
- price := t.Price
- volumns := t.Volume
- c := tc.c
- ts := convTS(t.Timestamp, tc.period)
- if c == nil || c.Timestamp < ts { // 产生新的K线的条件
- c = &Candle{
- Open: price,
- High: price,
- Low: price,
- Close: price,
- RealVolums: t.Volume,
- TickVolums: 1,
- }
- tc.c = c
- } else if c.Timestamp == ts {
- c.High = max(c.High, price)
- c.Low = min(c.Low, price)
- c.Close = price
- c.TickVolums += 1
- c.RealVolums += volumns
- } else {
- return nil
- }
- c.Timestamp = ts
- return c
- }*/
- type candleConver struct {
- r CandleReader
- buf *candleBuffer
- period int
- c *Candle
- cg *base.Candle
- ohlc base.Ohlc
- }
- func (tc *candleConver) doConv() {
- for {
- c, err := tc.r.Read()
- if err != nil {
- ohlcGo := tc.ohlc.ToGOStruct()
- if ohlcGo.Time != 0 {
- candle := OhlcGo2Candle(ohlcGo)
- tc.buf.write(&candle, nil)
- }
- tc.buf.write(nil, errReadEOF)
- return
- }
- cc := tc.convEx(c)
- if tc.period == M1 {
- //testCandle(2, c, nil, cc, false)
- }
- for _, v := range cc {
- if v != nil {
- candle := *v // copy
- tc.buf.write(&candle, nil)
- }
- }
- }
- }
- var testCandleFile [6]*os.File
- func testCandle(testIndex int, candle *Candle, tick *Tick, candles []*Candle, tOrc bool) {
- if testCandleFile[2*testIndex] == nil && testCandleFile[2*testIndex+1] == nil {
- fName := fmt.Sprintf("%d", testIndex)
- originalFileName := fName + "_original.txt"
- candleFileName := fName + "_candle.txt"
- testCandleFile[2*testIndex], _ = os.Create(originalFileName)
- testCandleFile[2*testIndex+1], _ = os.Create(candleFileName)
- }
- if tOrc {
- testTime := time.Unix(tick.Timestamp/1000, 0)
- fmt.Fprintln(testCandleFile[2*testIndex], testTime, *tick)
- } else {
- testTime := time.Unix(candle.Timestamp/1000, 0)
- fmt.Fprintln(testCandleFile[2*testIndex], testTime, *candle)
- }
- for _, v := range candles {
- testTime := time.Unix(v.Timestamp/1000, 0)
- fmt.Fprintln(testCandleFile[2*testIndex+1], testTime, *v)
- }
- }
- func (tc *candleConver) convEx(candle *Candle) []*Candle {
- ohlcGo := Candle2OhlcGo(*candle)
- num := tc.cg.UpdateOhlc(ohlcGo.ToCStruct())
- var candles []*Candle
- //ohlc := base.Ohlc{}
- if num == 0 {
- tc.cg.Next(&tc.ohlc)
- //ohlcGo := tc.ohlc.ToGOStruct()
- //candle := OhlcGo2Candle(ohlcGo)
- //candles = append(candles, &candle)
- } else if num > 0 {
- for i := 0; i < num; i++ {
- ohlcGo := tc.ohlc.ToGOStruct()
- if ohlcGo.Time != 0 {
- candle := OhlcGo2Candle(ohlcGo)
- candles = append(candles, &candle)
- }
- tc.cg.Next(&tc.ohlc)
- }
- } else {
- //log.Println("tick error.")
- }
- return candles
- }
- /*func (tc *candleConver) conv(candle *Candle) *Candle {
- c := tc.c
- ts := convTS(candle.Timestamp, tc.period)
- if c == nil || c.Timestamp < ts { // new candle
- c = &Candle{}
- *c = *candle
- tc.c = c
- } else if c.Timestamp == ts { // same period
- c.High = max(c.High, candle.High)
- c.Low = min(c.Low, candle.Low)
- c.Close = candle.Close
- c.TickVolums += candle.TickVolums
- c.RealVolums += candle.RealVolums
- } else {
- return nil
- }
- c.Timestamp = ts
- return c
- }*/
- func CandleConv(r CandleReader, insId string, period int) CandleReader {
- buf := &candleBuffer{ch: make(chan candleAndErr)}
- cg, _ := base.NewCandle(period, 2, nil, 0)
- if strings.HasPrefix(insId, Ctp) {
- cg.Set(base.CANDLE_AUTOCOMPLETE_MAX, 1)
- }
- cc := &candleConver{
- r: r,
- period: period,
- buf: buf,
- cg: cg,
- }
- go cc.doConv()
- return buf
- }
- func TickConvCandle(r TickReader, insId string, period int) ([]Candle, error) {
- return tickConvCandle(r, insId, period)
- }
- func tickConvCandle(r TickReader, insId string, period int) ([]Candle, error) {
- reader := tickConv(r, insId, period, nil)
- var buf []Candle
- var prev *Candle
- for {
- candle, err := reader.Read()
- if err == errReadEOF {
- // 添加最后的Candle
- if prev != nil {
- buf = append(buf, *prev)
- }
- break
- }
- if err != nil {
- break
- }
- if prev == nil {
- prev = candle
- }
- if prev.Timestamp != candle.Timestamp { //产生新的K线
- buf = append(buf, *prev)
- }
- prev = candle
- }
- return buf, nil
- }
- func ConvPeriod(r CandleReader, insId string, period int) ([]Candle, error) {
- return convPeriod(r, insId, period)
- }
- func convPeriod(r CandleReader, insId string, period int) ([]Candle, error) {
- reader := CandleConv(r, insId, period /*, UseBid, true*/)
- var buf []Candle
- var prev *Candle
- for {
- candle, err := reader.Read()
- if err == errReadEOF {
- // 添加最后的Candle
- if prev != nil {
- buf = append(buf, *prev)
- }
- break
- }
- if err != nil {
- break
- }
- if prev == nil {
- prev = candle
- }
- if prev.Timestamp != candle.Timestamp { //产生新的K线
- buf = append(buf, *prev)
- }
- prev = candle
- }
- return buf, nil
- }
- type TickBuf struct {
- ticks []Tick // 存储tick数据
- rd int
- }
- func NewTickBuf(ticks []Tick) *TickBuf {
- return &TickBuf{ticks: ticks}
- }
- var errReadEOF = errors.New("read EOF")
- func (r *TickBuf) Read() (*Tick, error) {
- if r.rd == len(r.ticks) {
- return nil, errReadEOF
- }
- t := &r.ticks[r.rd]
- r.rd += 1
- return t, nil
- }
- type candleBuf struct {
- candles []Candle // 存储tick数据
- rd int
- }
- func NewCandleBuf(candles []Candle) *candleBuf {
- return &candleBuf{candles: candles}
- }
- func (r *candleBuf) Read() (*Candle, error) {
- if r.rd == len(r.candles) {
- return nil, errReadEOF
- }
- candle := &r.candles[r.rd]
- r.rd += 1
- return candle, nil
- }
- type BinaryReader struct {
- R io.Reader
- }
- func (r *BinaryReader) Read() (*Candle, error) {
- return ReadCandleBinary(r.R)
- }
- type TickBinaryReader struct {
- R io.Reader
- }
- func (r *TickBinaryReader) Read() (*Tick, error) {
- return ReadTickBinary(r.R)
- }
|