// Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved. // 本程序用来把永华的期货数据导入到tickserver中 package main import ( "database/sql" "encoding/json" "fmt" "log" "os" "path" "sort" "strings" "tickserver/server/market" ) var pair = []string{ "EURUSD", "EURGBP", "GBPUSD", "USDJPY", "USDCHF", "AUDUSD", "USDCAD", "NZDUSD", "CHFJPY", "EURJPY", "EURCHF", "EURAUD", "EURCAD", "GBPCHF", "GBPJPY", "OILUSD", "CADJPY", "AUDJPY", "AUDCAD", "AUDNZD", "XAGUSD", "XAUUSD", } const ( PTK = 0 //period index PM1 = 1 PM5 = 2 PH1 = 3 PD1 = 4 TK = 0 //period duration M1 = 1 * 60 M5 = 5 * 60 H1 = 60 * 60 D1 = 24 * 3600 ) const ( DATATYPE = "easyforex" BUFLEN = 1024 ) var PeriodMap = map[int]int{ PTK: TK, PM1: M1, PM5: M5, PH1: H1, PD1: D1, } var PeriodNameMap = map[int]string{ TK: "Tick", M1: "M1", M5: "M5", H1: "H1", D1: "D1", } type Tick struct { //tick结构 time int64 buyPercent float64 } type Conf struct { DSN string // dsn = fmt.Sprintf("root:fzm@1001@/%s?charset=%s", dbName, "utf8") DataDir string } type byTime []market.Candle func (a byTime) Len() int { return len(a) } func (a byTime) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a byTime) Less(i, j int) bool { return a[i].Timestamp < a[j].Timestamp } var conf *Conf func main() { // set log /*logF, err := os.Create("./hisconv.log.txt") if err != nil { log.Fatal(err) } defer logF.Close() log.SetOutput(logF)*/ var err error conf, err = ReadConf() if err != nil { log.Fatal(err) } go getTick() select {} } func connectDB(dsn string) (*sql.DB, error) { db, err := sql.Open("mysql", dsn) if err != nil { return nil, err } return db, nil } func getInstrumentName(symbol string) string { return DATATYPE + "_" + symbol } func getTableName(symbol string, period int) string { tableName := symbol tableName += "_" tableName += PeriodNameMap[period] return strings.ToLower(tableName) } func getTickFromDB(symbol string, period int, tkCh chan<- Tick) error { db, err := connectDB(conf.DSN) if err != nil { log.Println("error: connect db", symbol, err) return err } defer db.Close() tableName := getTableName(symbol, period) var offset int for { queryString := fmt.Sprintf("SELECT time,buy_percent FROM %s order by time ASC limit %d,1000", tableName, offset) //log.Println(queryString) rows, err := db.Query(queryString) if err != nil { log.Println("error: query", symbol, err) return err } defer rows.Close() var count int for rows.Next() { var tk Tick if err := rows.Scan(&tk.time, &tk.buyPercent); err != nil { log.Println("error: scan", symbol, err) return err } tkCh <- tk count++ } offset += count if count < 1000 { break } } log.Println("source tick of", symbol, "finished.") tk := Tick{-1, 0.0} //notify save goroutine there's no data anymore tkCh <- tk return nil } func processTick(symbol string, tkCh <-chan Tick) { var ticks []market.Tick insId := getInstrumentName(symbol) var last int64 for { tk := <-tkCh if tk.time == -1 || ((tk.time/(3600*24) != last) && (last != 0)) { if len(ticks) > 0 { fname, err := market.SaveTickEx(conf.DataDir, ticks, insId, true) if err != nil { log.Println("error: savetickex", symbol, fname, err) } err = convAndSaveCandles(insId, ticks) if err != nil { log.Println("error: convandsavecandles", symbol, err) } ticks = nil } if tk.time == -1 { break } } var mk market.Tick mk.Timestamp = tk.time * 1000 mk.Price = 100 - tk.buyPercent ticks = append(ticks, mk) last = tk.time / (3600 * 24) } } func getTick() (err error) { for _, symbol := range pair { tkCh := make(chan Tick, 1024) go getTickFromDB(symbol, TK, tkCh) go processTick(symbol, tkCh) } return nil } func convAndSaveCandles(insId string, ticks []market.Tick) error { //refer := path var candles []market.Candle pa := []int{market.M1, market.M5, market.H1, market.D1} for _, period := range pa { var err error if period == market.M1 { candles, err = convCandles0(ticks, insId, market.M1) } else { candles, err = convCandles1(candles, insId, period) } if err != nil { return err } if period == market.D1 { dir := path.Join(conf.DataDir, insId) os.MkdirAll(dir, 0777) var bname string bname = fmt.Sprintf("%s.gz", market.PeriodNameMap[period]) fname := path.Join(dir, bname) candles, _ = combinEx(fname, candles) } _, err = market.SaveCandlesEx(conf.DataDir, insId, candles, period, true) if err != nil { return err } } return nil } func convCandles0(ticks []market.Tick, insId string, period int) ([]market.Candle, error) { r := market.NewTickBuf(ticks) return market.TickConvCandle(r, insId, period) } func convCandles1(candles []market.Candle, insId string, period int) ([]market.Candle, error) { r := market.NewCandleBuf(candles) return market.ConvPeriod(r, insId, period) } func ReadConf() (*Conf, error) { f, err := os.Open("efhisconv.json") if err != nil { return nil, err } defer f.Close() dec := json.NewDecoder(f) conf := &Conf{} err = dec.Decode(conf) if err != nil { return nil, err } return conf, nil } func combinEx(filename string, candles []market.Candle) ([]market.Candle, error) { buf, err := market.ReadCandleFile(filename) if err != nil { return candles, err } candles = append(buf, candles[:]...) sort.Sort(byTime(candles)) return candles, nil }