123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387 |
- // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved.
- // 本程序用来把永华的期货数据导入到tickserver中
- package main
- import (
- "encoding/csv"
- "errors"
- "flag"
- "fmt"
- "io"
- "log"
- "os"
- "path"
- "path/filepath"
- "runtime"
- "runtime/pprof"
- "sort"
- "strconv"
- "strings"
- "time"
- "tickserver/server/market"
- )
- type byTime []market.Candle
- func (a byTime) Len() int { return len(a) }
- func (a byTime) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
- func (a byTime) Less(i, j int) bool { return a[i].Timestamp < a[j].Timestamp }
- var exchangeMap = map[string]string{
- "DL": "大商所",
- "SQ": "上期所",
- "ZJ": "中金所",
- "ZZ": "郑商所",
- "000300": "",
- }
- var sdir = flag.String("s", "ctp_history_data", "src ctp history data file path")
- var ddir = flag.String("d", "fzmnew", "dst ctp history data file path")
- var ngo = flag.Int("n", 4, "n goroutine conv data into tickserver")
- var dbg = flag.Bool("g", false, "debug use sqlite db")
- var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
- func main() {
- flag.Parse()
- if *cpuprofile != "" {
- f, err := os.Create(*cpuprofile)
- if err != nil {
- log.Fatal(err)
- }
- pprof.StartCPUProfile(f)
- defer pprof.StopCPUProfile()
- }
- // set log
- /*
- logF, err := os.Create("./hisconv.log.txt")
- if err != nil {
- log.Fatal(err)
- }
- defer logF.Close()
- log.SetOutput(logF)
- */
- log.Println(*sdir, *ddir, *ngo, *dbg)
- ch := make(chan string, 1)
- go func() {
- filepath.Walk(*sdir, func(path string, info os.FileInfo, err error) error {
- if err != nil {
- return err
- }
- if !info.IsDir() {
- ch <- path
- }
- return nil
- })
- close(ch)
- }()
- log.Fatal(run(*ddir, 1, ch)) //*ngo
- }
- func run(ddir string, n int, ch chan string) error {
- f := func(done chan bool) {
- for {
- path, ok := <-ch
- if !ok {
- done <- true
- break
- }
- // 解析文件
- name := filepath.Base(path)
- log.Println("beg parse::", name)
- ticks, err := parseFile(path)
- log.Println("end parse::", name)
- if err != nil {
- log.Println(err)
- continue
- }
- // 解析insId
- ex := "DL"
- for ex, _ = range exchangeMap {
- i := strings.Index(path, ex)
- if i != -1 {
- path = path[i:]
- break
- }
- }
- insId, err := parseInsId(path, name, ex)
- if err != nil {
- log.Println(err)
- }
- // 保存tick数据
- path = filepath.Join(ddir, market.Ctp)
- os.MkdirAll(path, os.ModePerm)
- //path = filepath.Join(path, name)
- log.Println("beg save::", name)
- _, err = market.SaveTickEx(path, ticks, insId, true)
- if err != nil {
- log.Println(err, path)
- continue
- }
- log.Println("end save::", name)
- // 保存K线数据
- log.Println("beg cand::", name)
- err = convAndSaveCandles(insId, ex, ticks)
- if err != nil {
- log.Println(err, path)
- }
- log.Println("end cand::", name)
- }
- }
- if n < 1 {
- n = runtime.NumCPU()
- }
- runtime.GOMAXPROCS(n)
- done := make(chan bool, n)
- for i := 0; i < n; i++ {
- go f(done)
- }
- for i := 0; i < n; i++ {
- <-done
- }
- return nil
- }
- func parseTime(stime string) (time.Time, error) {
- date := strings.Replace(stime, "/", "-", -1)
- tpl := "2006-1-2 15:04:05"
- if isZeroPad(stime) {
- tpl = "2006-01-02 15:04:05"
- }
- return time.Parse(tpl, date)
- }
- func isZeroPad(stime string) bool {
- date := strings.Split(stime, " ")
- if len(date) == 2 {
- return len(date[0]) == 10
- }
- return false
- }
- func parseFile(path string) ([]market.Tick, error) {
- if !strings.HasSuffix(path, "csv") {
- return nil, errors.New("history file data format error, must csv file " + path)
- }
- file, err := os.Open(path)
- if err != nil {
- return nil, err
- }
- defer file.Close()
- skipheader := true
- r := csv.NewReader(file)
- ticks := []market.Tick{}
- for {
- data, err := r.Read()
- if err == io.EOF {
- break
- }
- if err != nil {
- return nil, err
- }
- if skipheader {
- skipheader = false
- continue
- }
- t, err := parseTick(data)
- if err != nil {
- log.Println(err, path)
- continue
- }
- ticks = append(ticks, *t)
- }
- return ticks, nil
- }
- func convDate(st string) (*time.Time, error) {
- if len(st) < 6 {
- return nil, errors.New(st + " is error format. MUST yyyymmdd")
- }
- sy := st[:4]
- sm := st[4:6]
- sd := st[6:]
- y, _ := strconv.ParseInt(string(sy), 10, 64)
- m, _ := strconv.ParseInt(string(sm), 10, 64)
- d, _ := strconv.ParseInt(string(sd), 10, 64)
- t := time.Date(int(y), time.Month(m), int(d), 0, 0, 0, 0, time.Local)
- return &t, nil
- }
- func getInsId(ex, s string) string {
- insId := ""
- if ex == "DL" || ex == "SQ" {
- insId = strings.ToLower(s)
- } else {
- insId = strings.ToUpper(s)
- }
- return market.CtpPrefix + insId
- }
- func parseInsId(path, name, ex string) (string, error) {
- nameError := errors.New(name + " file name error. must xx_yyyymmdd.csv format")
- k := ex
- ss := strings.Split(name, "_") // xx_yyyymmdd.csv
- if len(ss) != 2 {
- return "", nameError
- }
- id := ss[0]
- if len(id) < 3 {
- return "", nameError
- }
- pid := id[:len(id)-2]
- if strings.HasSuffix(id, "MI") || strings.HasSuffix(id, "mi") { // 主力连续
- return getInsId(k, pid) + "MI", nil
- }
- sidt := id[len(id)-2:]
- idt, err := strconv.Atoi(sidt)
- if err != nil {
- return "", nameError
- }
- if idt > 12 { // 指标
- return getInsId(k, id), nil
- }
- ss = strings.Split(ss[1], ".") // yyyymmdd.csv
- if len(ss) != 2 {
- return "", nameError
- }
- t, err := convDate(ss[0])
- if err != nil {
- return "", nameError
- }
- y := t.Year()
- if strings.HasSuffix(pid, "x") || strings.HasSuffix(pid, "X") { // aX01 ==> a1601 x偶数年
- if y%2 == 0 {
- if int(t.Month()) > idt {
- y += 2
- }
- } else {
- y += 1
- }
- pid = pid[:len(pid)-1] // remove x
- } else if strings.HasSuffix(pid, "Y") || strings.HasSuffix(pid, "Y") { // aY01 ==> a1501 y奇数年
- if pid != "Y" || pid != "y" {
- if y%2 == 0 {
- y += 1
- } else {
- if int(t.Month()) > idt {
- y += 2
- }
- }
- pid = pid[:len(pid)-1] // remove y
- }
- } else if int(t.Month()) > idt {
- y += 1
- }
- sy := strconv.Itoa(y)
- insId := pid + sy[2:] + sidt // "a" + "16" + "01" = a1601
- return getInsId(k, insId), nil
- }
- // 日期 时间 成交价 成交量 总量 属性(持仓增减) B1价 B1量 B2价 B2量 B3价 B3量 S1价 S1量 S2价 S2量 S3价 S3量 BS
- func parseTick(data []string) (*market.Tick, error) {
- if len(data) < 13 {
- return nil, errors.New("len(data) < 13")
- }
- stime := data[0] + " " + data[1]
- t, err := parseTime(stime)
- if err != nil {
- return nil, err
- }
- price, err := strconv.ParseFloat(data[2], 64)
- if err != nil {
- return nil, err
- }
- vol, err := strconv.ParseFloat(data[3], 64)
- if err != nil {
- return nil, err
- }
- bidp1, err := strconv.ParseFloat(data[6], 64)
- if err != nil {
- return nil, err
- }
- bidv1, err := strconv.ParseFloat(data[7], 64)
- if err != nil {
- return nil, err
- }
- askpi, err := strconv.ParseFloat(data[12], 64)
- if err != nil {
- return nil, err
- }
- askv1, err := strconv.ParseFloat(data[13], 64)
- if err != nil {
- return nil, err
- }
- tick := &market.Tick{
- Timestamp: (t.Unix() - 3600*8) * 1000, // to utc time
- Price: price,
- Volume: vol,
- Bid: market.PP{bidp1, bidv1},
- Ask: market.PP{askpi, askv1},
- }
- return tick, nil
- }
- func convAndSaveCandles(insId, ex string, ticks []market.Tick) error {
- var candles []market.Candle
- pa := []int{market.M1, market.M5, market.H1, market.D1}
- for _, period := range pa {
- var err error
- if period == market.M1 {
- candles, err = convCandles0(ticks, insId, market.M1)
- } else {
- candles, err = convCandles1(candles, insId, period)
- }
- if err != nil {
- return err
- }
- newpath := filepath.Join(*ddir, market.Ctp)
- os.MkdirAll(newpath, os.ModePerm)
- if period == market.D1 {
- dir := path.Join(newpath, insId)
- os.MkdirAll(dir, 0777)
- var bname string
- bname = fmt.Sprintf("%s.gz", market.PeriodNameMap[period])
- fname := path.Join(dir, bname)
- candles, _ = combinEx(fname, candles)
- }
- _, err = market.SaveCandlesEx(newpath, insId, candles, period, true)
- if err != nil {
- return err
- }
- }
- return nil
- }
- func convCandles0(ticks []market.Tick, insId string, period int) ([]market.Candle, error) {
- r := market.NewTickBuf(ticks)
- return market.TickConvCandle(r, insId, period)
- }
- func convCandles1(candles []market.Candle, insId string, period int) ([]market.Candle, error) {
- r := market.NewCandleBuf(candles)
- return market.ConvPeriod(r, insId, period)
- }
- func combinEx(filename string, candles []market.Candle) ([]market.Candle, error) {
- buf, err := market.ReadCandleFile(filename)
- if err != nil {
- return candles, err
- }
- candles = append(buf, candles[:]...)
- sort.Sort(byTime(candles))
- return candles, nil
- }
|