// Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved. // 本程序用来把永华的期货数据导入到tickserver中 package main import ( "encoding/csv" "errors" "flag" "fmt" "io" "log" "os" "path" "path/filepath" "runtime" "runtime/pprof" "sort" "strconv" "strings" "time" "tickserver/server/market" ) type byTime []market.Candle func (a byTime) Len() int { return len(a) } func (a byTime) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a byTime) Less(i, j int) bool { return a[i].Timestamp < a[j].Timestamp } var exchangeMap = map[string]string{ "DL": "大商所", "SQ": "上期所", "ZJ": "中金所", "ZZ": "郑商所", "000300": "", } var sdir = flag.String("s", "ctp_history_data", "src ctp history data file path") var ddir = flag.String("d", "fzmnew", "dst ctp history data file path") var ngo = flag.Int("n", 4, "n goroutine conv data into tickserver") var dbg = flag.Bool("g", false, "debug use sqlite db") var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") func main() { flag.Parse() if *cpuprofile != "" { f, err := os.Create(*cpuprofile) if err != nil { log.Fatal(err) } pprof.StartCPUProfile(f) defer pprof.StopCPUProfile() } // set log /* logF, err := os.Create("./hisconv.log.txt") if err != nil { log.Fatal(err) } defer logF.Close() log.SetOutput(logF) */ log.Println(*sdir, *ddir, *ngo, *dbg) ch := make(chan string, 1) go func() { filepath.Walk(*sdir, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.IsDir() { ch <- path } return nil }) close(ch) }() log.Fatal(run(*ddir, 1, ch)) //*ngo } func run(ddir string, n int, ch chan string) error { f := func(done chan bool) { for { path, ok := <-ch if !ok { done <- true break } // 解析文件 name := filepath.Base(path) log.Println("beg parse::", name) ticks, err := parseFile(path) log.Println("end parse::", name) if err != nil { log.Println(err) continue } // 解析insId ex := "DL" for ex, _ = range exchangeMap { i := strings.Index(path, ex) if i != -1 { path = path[i:] break } } insId, err := parseInsId(path, name, ex) if err != nil { log.Println(err) } // 保存tick数据 path = filepath.Join(ddir, market.Ctp) os.MkdirAll(path, os.ModePerm) //path = filepath.Join(path, name) log.Println("beg save::", name) _, err = market.SaveTickEx(path, ticks, insId, true) if err != nil { log.Println(err, path) continue } log.Println("end save::", name) // 保存K线数据 log.Println("beg cand::", name) err = convAndSaveCandles(insId, ex, ticks) if err != nil { log.Println(err, path) } log.Println("end cand::", name) } } if n < 1 { n = runtime.NumCPU() } runtime.GOMAXPROCS(n) done := make(chan bool, n) for i := 0; i < n; i++ { go f(done) } for i := 0; i < n; i++ { <-done } return nil } func parseTime(stime string) (time.Time, error) { date := strings.Replace(stime, "/", "-", -1) tpl := "2006-1-2 15:04:05" if isZeroPad(stime) { tpl = "2006-01-02 15:04:05" } return time.Parse(tpl, date) } func isZeroPad(stime string) bool { date := strings.Split(stime, " ") if len(date) == 2 { return len(date[0]) == 10 } return false } func parseFile(path string) ([]market.Tick, error) { if !strings.HasSuffix(path, "csv") { return nil, errors.New("history file data format error, must csv file " + path) } file, err := os.Open(path) if err != nil { return nil, err } defer file.Close() skipheader := true r := csv.NewReader(file) ticks := []market.Tick{} for { data, err := r.Read() if err == io.EOF { break } if err != nil { return nil, err } if skipheader { skipheader = false continue } t, err := parseTick(data) if err != nil { log.Println(err, path) continue } ticks = append(ticks, *t) } return ticks, nil } func convDate(st string) (*time.Time, error) { if len(st) < 6 { return nil, errors.New(st + " is error format. MUST yyyymmdd") } sy := st[:4] sm := st[4:6] sd := st[6:] y, _ := strconv.ParseInt(string(sy), 10, 64) m, _ := strconv.ParseInt(string(sm), 10, 64) d, _ := strconv.ParseInt(string(sd), 10, 64) t := time.Date(int(y), time.Month(m), int(d), 0, 0, 0, 0, time.Local) return &t, nil } func getInsId(ex, s string) string { insId := "" if ex == "DL" || ex == "SQ" { insId = strings.ToLower(s) } else { insId = strings.ToUpper(s) } return market.CtpPrefix + insId } func parseInsId(path, name, ex string) (string, error) { nameError := errors.New(name + " file name error. must xx_yyyymmdd.csv format") k := ex ss := strings.Split(name, "_") // xx_yyyymmdd.csv if len(ss) != 2 { return "", nameError } id := ss[0] if len(id) < 3 { return "", nameError } pid := id[:len(id)-2] if strings.HasSuffix(id, "MI") || strings.HasSuffix(id, "mi") { // 主力连续 return getInsId(k, pid) + "MI", nil } sidt := id[len(id)-2:] idt, err := strconv.Atoi(sidt) if err != nil { return "", nameError } if idt > 12 { // 指标 return getInsId(k, id), nil } ss = strings.Split(ss[1], ".") // yyyymmdd.csv if len(ss) != 2 { return "", nameError } t, err := convDate(ss[0]) if err != nil { return "", nameError } y := t.Year() if strings.HasSuffix(pid, "x") || strings.HasSuffix(pid, "X") { // aX01 ==> a1601 x偶数年 if y%2 == 0 { if int(t.Month()) > idt { y += 2 } } else { y += 1 } pid = pid[:len(pid)-1] // remove x } else if strings.HasSuffix(pid, "Y") || strings.HasSuffix(pid, "Y") { // aY01 ==> a1501 y奇数年 if pid != "Y" || pid != "y" { if y%2 == 0 { y += 1 } else { if int(t.Month()) > idt { y += 2 } } pid = pid[:len(pid)-1] // remove y } } else if int(t.Month()) > idt { y += 1 } sy := strconv.Itoa(y) insId := pid + sy[2:] + sidt // "a" + "16" + "01" = a1601 return getInsId(k, insId), nil } // 日期 时间 成交价 成交量 总量 属性(持仓增减) B1价 B1量 B2价 B2量 B3价 B3量 S1价 S1量 S2价 S2量 S3价 S3量 BS func parseTick(data []string) (*market.Tick, error) { if len(data) < 13 { return nil, errors.New("len(data) < 13") } stime := data[0] + " " + data[1] t, err := parseTime(stime) if err != nil { return nil, err } price, err := strconv.ParseFloat(data[2], 64) if err != nil { return nil, err } vol, err := strconv.ParseFloat(data[3], 64) if err != nil { return nil, err } bidp1, err := strconv.ParseFloat(data[6], 64) if err != nil { return nil, err } bidv1, err := strconv.ParseFloat(data[7], 64) if err != nil { return nil, err } askpi, err := strconv.ParseFloat(data[12], 64) if err != nil { return nil, err } askv1, err := strconv.ParseFloat(data[13], 64) if err != nil { return nil, err } tick := &market.Tick{ Timestamp: (t.Unix() - 3600*8) * 1000, // to utc time Price: price, Volume: vol, Bid: market.PP{bidp1, bidv1}, Ask: market.PP{askpi, askv1}, } return tick, nil } func convAndSaveCandles(insId, ex string, ticks []market.Tick) error { var candles []market.Candle pa := []int{market.M1, market.M5, market.H1, market.D1} for _, period := range pa { var err error if period == market.M1 { candles, err = convCandles0(ticks, insId, market.M1) } else { candles, err = convCandles1(candles, insId, period) } if err != nil { return err } newpath := filepath.Join(*ddir, market.Ctp) os.MkdirAll(newpath, os.ModePerm) if period == market.D1 { dir := path.Join(newpath, insId) os.MkdirAll(dir, 0777) var bname string bname = fmt.Sprintf("%s.gz", market.PeriodNameMap[period]) fname := path.Join(dir, bname) candles, _ = combinEx(fname, candles) } _, err = market.SaveCandlesEx(newpath, insId, candles, period, true) if err != nil { return err } } return nil } func convCandles0(ticks []market.Tick, insId string, period int) ([]market.Candle, error) { r := market.NewTickBuf(ticks) return market.TickConvCandle(r, insId, period) } func convCandles1(candles []market.Candle, insId string, period int) ([]market.Candle, error) { r := market.NewCandleBuf(candles) return market.ConvPeriod(r, insId, period) } func combinEx(filename string, candles []market.Candle) ([]market.Candle, error) { buf, err := market.ReadCandleFile(filename) if err != nil { return candles, err } candles = append(buf, candles[:]...) sort.Sort(byTime(candles)) return candles, nil }