// Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved. package market import ( "errors" "fmt" "io" //"log" "os" "strings" "time" "unsafe" "tickserver/framework/base" ) // pconv.go 实现周期的转换, 包括tick到任意周期, 以及低周期向高周期的转换 // 周期定义 const ( TK = 0 S1 = 1 S2 = 2 S3 = 3 S4 = 4 S5 = 5 S15 = 15 S30 = 30 M1 = 1 * 60 M2 = 2 * 60 M3 = 3 * 60 M4 = 4 * 60 M5 = 5 * 60 M15 = 15 * 60 M30 = 30 * 60 H1 = 60 * 60 H2 = 2 * 60 * 60 H4 = 4 * 60 * 60 D1 = 24 * 3600 W1 = 7 * 24 * 3600 MN1 = 30 * 24 * 3600 ) var PeriodNameMap = map[int]string{ TK: "Tick", S1: "S1", S2: "S2", S3: "S3", S4: "S4", S5: "S5", S15: "S15", S30: "S30", M1: "M1", M2: "M2", M3: "M3", M4: "M4", M5: "M5", M15: "M15", M30: "M30", H1: "H1", H2: "H2", H4: "H4", D1: "D1", W1: "W1", MN1: "MN1", } var PeriodIdMap = map[string]int{ "Tick": TK, "S1": S1, "S2": S2, "S3": S3, "S4": S4, "S5": S5, "S15": S15, "S30": S30, "M1": M1, "M2": M2, "M3": M3, "M4": M4, "M5": M5, "M15": M15, "M30": M30, "H1": H1, "H2": H2, "H4": H4, "D1": D1, "W1": W1, "MN1": MN1, } // Tick到周期的转换 type TickReader interface { Read() (*Tick, error) } type CandleReader interface { Read() (*Candle, error) } type TickBuffer struct { ch chan *Tick } func NewTickBuffer(ch chan *Tick) *TickBuffer { return &TickBuffer{ch} } func (buf *TickBuffer) Write(t *Tick) { buf.ch <- t } func (buf *TickBuffer) Read() (*Tick, error) { t := <-buf.ch if t == nil { return nil, errReadEOF } return t, nil } type candleAndErr struct { candle *Candle err error } type candleBuffer struct { ch chan candleAndErr } func (buf *candleBuffer) Read() (*Candle, error) { ce := <-buf.ch return ce.candle, ce.err } func (buf *candleBuffer) write(candle *Candle, err error) { buf.ch <- candleAndErr{candle, err} } func max(p1, p2 float64) float64 { if p1 < p2 { return p2 } return p1 } func min(p1, p2 float64) float64 { if p1 > p2 { return p2 } return p1 } type tickConver struct { r TickReader buf *candleBuffer period int c *Candle cg *base.Candle ohlc base.Ohlc } // 用来转换周期 func convTS(ts int64, period int) int64 { ts = (3600000*8 + ts) / (int64(period) * 1000) * (int64(period) * 1000) // 时间按照周期取整 ts -= 3600000 * 8 return ts } func (tc *tickConver) convEx(t *Tick) []*Candle { tg := Tk2Tg(*t) num := tc.cg.UpdateTick((*base.Tick)(unsafe.Pointer(&tg))) var tmpcandles []*Candle //ohlc := base.Ohlc{} if num == 0 { tc.cg.Next(&tc.ohlc) ohlcGo := tc.ohlc.ToGOStruct() tmpcandle := OhlcGo2Candle(ohlcGo) tmpcandles = append(tmpcandles, &tmpcandle) } else if num > 0 { for i := 0; i < num; i++ { tc.cg.Next(&tc.ohlc) ohlcGo := tc.ohlc.ToGOStruct() tmpcandle := OhlcGo2Candle(ohlcGo) tmpcandles = append(tmpcandles, &tmpcandle) } } else { //log.Println("tick error.") } var candles []*Candle for _, tmpcandle := range tmpcandles { if len(candles) > 0 && candles[len(candles)-1].Timestamp == tmpcandle.Timestamp { candles[len(candles)-1] = tmpcandle } else { candles = append(candles, tmpcandle) } } return candles } /*func (tc *tickConver) conv(t *Tick) *Candle { price := t.Price volumns := t.Volume c := tc.c ts := convTS(t.Timestamp, tc.period) if c == nil || c.Timestamp < ts { // 产生新的K线的条件 c = &Candle{ Open: price, High: price, Low: price, Close: price, RealVolums: t.Volume, TickVolums: 1, } tc.c = c } else if c.Timestamp == ts { c.High = max(c.High, price) c.Low = min(c.Low, price) c.Close = price c.TickVolums += 1 c.RealVolums += volumns } else { return nil } c.Timestamp = ts return c }*/ func (tc *tickConver) doConv() { for { t, err := tc.r.Read() if err != nil { // log.Println("doConv error:", err) ohlcGo := tc.ohlc.ToGOStruct() if ohlcGo.Time != 0 { candle := OhlcGo2Candle(ohlcGo) tc.buf.write(&candle, nil) } tc.buf.write(nil, errReadEOF) return } if t.Price == 0 { continue } c := tc.convEx(t) if tc.period == M1 { //testCandle(1, nil, t, c, true) } for _, v := range c { if v != nil { candle := *v // copy tc.buf.write(&candle, nil) } } } } func TickConv(r TickReader, insId string, period int, c *Candle) CandleReader { return tickConv(r, insId, period, c) } func tickConv(r TickReader, insId string, period int, c *Candle) CandleReader { buf := &candleBuffer{ch: make(chan candleAndErr)} cg, _ := base.NewCandle(period, 2, nil, 0) if strings.HasPrefix(insId, Ctp) { cg.Set(base.CANDLE_AUTOCOMPLETE_MAX, 1) } tc := &tickConver{ r: r, period: period, buf: buf, c: c, cg: cg, } go tc.doConv() return buf } type tconver struct { ch chan *Tick c *Candle cg *base.Candle ohlc base.Ohlc insId string period int } func Tk2Tg(tk Tick) base.TickGo { var tg base.TickGo tg.Time = int32(tk.Timestamp / 1000) tg.Ms = int16(tk.Timestamp % 1000) tg.Symbol = 0 tg.Bid = float32(tk.Price) //tk.Bid[0] //tg.Ask = float32(tk.Price) //tk.Ask[0] tg.Bidv = float32(tk.Volume) //tk.Bid[1] //tg.Askv = int32(tk.Volume) //tk.Ask[1] return tg } func OhlcGo2Candle(ohlcGo base.OhlcGo) Candle { var c Candle c.Timestamp = int64(ohlcGo.Time) * 1000 c.Open = ohlcGo.Open c.High = ohlcGo.High c.Low = ohlcGo.Low c.Close = ohlcGo.Close c.TickVolums = float64(ohlcGo.TickVolumn) c.RealVolums = float64(ohlcGo.RealVolumn) return c } func Candle2OhlcGo(c Candle) base.OhlcGo { var ohlcGo base.OhlcGo ohlcGo.Time = int32(c.Timestamp / 1000) ohlcGo.Open = c.Open ohlcGo.High = c.High ohlcGo.Low = c.Low ohlcGo.Close = c.Close ohlcGo.TickVolumn = int64(c.TickVolums) ohlcGo.RealVolumn = c.RealVolums return ohlcGo } func (tc *tconver) convEx(t *Tick) []*Candle { tg := Tk2Tg(*t) num := tc.cg.UpdateTick((*base.Tick)(unsafe.Pointer(&tg))) var tmpcandles []*Candle //ohlc := base.Ohlc{} if num == 0 { tc.cg.Next(&tc.ohlc) ohlcGo := tc.ohlc.ToGOStruct() tmpcandle := OhlcGo2Candle(ohlcGo) tmpcandles = append(tmpcandles, &tmpcandle) } else if num > 0 { for i := 0; i < num; i++ { tc.cg.Next(&tc.ohlc) ohlcGo := tc.ohlc.ToGOStruct() tmpcandle := OhlcGo2Candle(ohlcGo) tmpcandles = append(tmpcandles, &tmpcandle) } } else { //log.Println("tick error.") } var candles []*Candle for _, tmpcandle := range tmpcandles { if len(candles) > 0 && candles[len(candles)-1].Timestamp == tmpcandle.Timestamp { candles[len(candles)-1] = tmpcandle } else { candles = append(candles, tmpcandle) } } return candles } /*func (tc *tconver) conv(t *Tick) *Candle { price := t.Price volumns := t.Volume c := tc.c ts := convTS(t.Timestamp, tc.period) if c == nil || c.Timestamp < ts { // 产生新的K线的条件 c = &Candle{ Open: price, High: price, Low: price, Close: price, RealVolums: t.Volume, TickVolums: 1, } tc.c = c } else if c.Timestamp == ts { c.High = max(c.High, price) c.Low = min(c.Low, price) c.Close = price c.TickVolums += 1 c.RealVolums += volumns } else { return nil } c.Timestamp = ts return c }*/ type candleConver struct { r CandleReader buf *candleBuffer period int c *Candle cg *base.Candle ohlc base.Ohlc } func (tc *candleConver) doConv() { for { c, err := tc.r.Read() if err != nil { ohlcGo := tc.ohlc.ToGOStruct() if ohlcGo.Time != 0 { candle := OhlcGo2Candle(ohlcGo) tc.buf.write(&candle, nil) } tc.buf.write(nil, errReadEOF) return } cc := tc.convEx(c) if tc.period == M1 { //testCandle(2, c, nil, cc, false) } for _, v := range cc { if v != nil { candle := *v // copy tc.buf.write(&candle, nil) } } } } var testCandleFile [6]*os.File func testCandle(testIndex int, candle *Candle, tick *Tick, candles []*Candle, tOrc bool) { if testCandleFile[2*testIndex] == nil && testCandleFile[2*testIndex+1] == nil { fName := fmt.Sprintf("%d", testIndex) originalFileName := fName + "_original.txt" candleFileName := fName + "_candle.txt" testCandleFile[2*testIndex], _ = os.Create(originalFileName) testCandleFile[2*testIndex+1], _ = os.Create(candleFileName) } if tOrc { testTime := time.Unix(tick.Timestamp/1000, 0) fmt.Fprintln(testCandleFile[2*testIndex], testTime, *tick) } else { testTime := time.Unix(candle.Timestamp/1000, 0) fmt.Fprintln(testCandleFile[2*testIndex], testTime, *candle) } for _, v := range candles { testTime := time.Unix(v.Timestamp/1000, 0) fmt.Fprintln(testCandleFile[2*testIndex+1], testTime, *v) } } func (tc *candleConver) convEx(candle *Candle) []*Candle { ohlcGo := Candle2OhlcGo(*candle) num := tc.cg.UpdateOhlc(ohlcGo.ToCStruct()) var candles []*Candle //ohlc := base.Ohlc{} if num == 0 { tc.cg.Next(&tc.ohlc) //ohlcGo := tc.ohlc.ToGOStruct() //candle := OhlcGo2Candle(ohlcGo) //candles = append(candles, &candle) } else if num > 0 { for i := 0; i < num; i++ { ohlcGo := tc.ohlc.ToGOStruct() if ohlcGo.Time != 0 { candle := OhlcGo2Candle(ohlcGo) candles = append(candles, &candle) } tc.cg.Next(&tc.ohlc) } } else { //log.Println("tick error.") } return candles } /*func (tc *candleConver) conv(candle *Candle) *Candle { c := tc.c ts := convTS(candle.Timestamp, tc.period) if c == nil || c.Timestamp < ts { // new candle c = &Candle{} *c = *candle tc.c = c } else if c.Timestamp == ts { // same period c.High = max(c.High, candle.High) c.Low = min(c.Low, candle.Low) c.Close = candle.Close c.TickVolums += candle.TickVolums c.RealVolums += candle.RealVolums } else { return nil } c.Timestamp = ts return c }*/ func CandleConv(r CandleReader, insId string, period int) CandleReader { buf := &candleBuffer{ch: make(chan candleAndErr)} cg, _ := base.NewCandle(period, 2, nil, 0) if strings.HasPrefix(insId, Ctp) { cg.Set(base.CANDLE_AUTOCOMPLETE_MAX, 1) } cc := &candleConver{ r: r, period: period, buf: buf, cg: cg, } go cc.doConv() return buf } func TickConvCandle(r TickReader, insId string, period int) ([]Candle, error) { return tickConvCandle(r, insId, period) } func tickConvCandle(r TickReader, insId string, period int) ([]Candle, error) { reader := tickConv(r, insId, period, nil) var buf []Candle var prev *Candle for { candle, err := reader.Read() if err == errReadEOF { // 添加最后的Candle if prev != nil { buf = append(buf, *prev) } break } if err != nil { break } if prev == nil { prev = candle } if prev.Timestamp != candle.Timestamp { //产生新的K线 buf = append(buf, *prev) } prev = candle } return buf, nil } func ConvPeriod(r CandleReader, insId string, period int) ([]Candle, error) { return convPeriod(r, insId, period) } func convPeriod(r CandleReader, insId string, period int) ([]Candle, error) { reader := CandleConv(r, insId, period /*, UseBid, true*/) var buf []Candle var prev *Candle for { candle, err := reader.Read() if err == errReadEOF { // 添加最后的Candle if prev != nil { buf = append(buf, *prev) } break } if err != nil { break } if prev == nil { prev = candle } if prev.Timestamp != candle.Timestamp { //产生新的K线 buf = append(buf, *prev) } prev = candle } return buf, nil } type TickBuf struct { ticks []Tick // 存储tick数据 rd int } func NewTickBuf(ticks []Tick) *TickBuf { return &TickBuf{ticks: ticks} } var errReadEOF = errors.New("read EOF") func (r *TickBuf) Read() (*Tick, error) { if r.rd == len(r.ticks) { return nil, errReadEOF } t := &r.ticks[r.rd] r.rd += 1 return t, nil } type candleBuf struct { candles []Candle // 存储tick数据 rd int } func NewCandleBuf(candles []Candle) *candleBuf { return &candleBuf{candles: candles} } func (r *candleBuf) Read() (*Candle, error) { if r.rd == len(r.candles) { return nil, errReadEOF } candle := &r.candles[r.rd] r.rd += 1 return candle, nil } type BinaryReader struct { R io.Reader } func (r *BinaryReader) Read() (*Candle, error) { return ReadCandleBinary(r.R) } type TickBinaryReader struct { R io.Reader } func (r *TickBinaryReader) Read() (*Tick, error) { return ReadTickBinary(r.R) }