main.go 8.8 KB


  1. // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved.
  2. // 本程序用来把永华的期货数据导入到tickserver中
  3. package main
  4. import (
  5. "encoding/csv"
  6. "errors"
  7. "flag"
  8. "fmt"
  9. "io"
  10. "log"
  11. "os"
  12. "path"
  13. "path/filepath"
  14. "runtime"
  15. "runtime/pprof"
  16. "sort"
  17. "strconv"
  18. "strings"
  19. "time"
  20. "tickserver/server/market"
  21. )
  22. type byTime []market.Candle
  23. func (a byTime) Len() int { return len(a) }
  24. func (a byTime) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  25. func (a byTime) Less(i, j int) bool { return a[i].Timestamp < a[j].Timestamp }
  26. var exchangeMap = map[string]string{
  27. "DL": "大商所",
  28. "SQ": "上期所",
  29. "ZJ": "中金所",
  30. "ZZ": "郑商所",
  31. "000300": "",
  32. }
  33. var sdir = flag.String("s", "ctp_history_data", "src ctp history data file path")
  34. var ddir = flag.String("d", "fzmnew", "dst ctp history data file path")
  35. var ngo = flag.Int("n", 4, "n goroutine conv data into tickserver")
  36. var dbg = flag.Bool("g", false, "debug use sqlite db")
  37. var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
  38. func main() {
  39. flag.Parse()
  40. if *cpuprofile != "" {
  41. f, err := os.Create(*cpuprofile)
  42. if err != nil {
  43. log.Fatal(err)
  44. }
  45. pprof.StartCPUProfile(f)
  46. defer pprof.StopCPUProfile()
  47. }
  48. // set log
  49. /*
  50. logF, err := os.Create("./hisconv.log.txt")
  51. if err != nil {
  52. log.Fatal(err)
  53. }
  54. defer logF.Close()
  55. log.SetOutput(logF)
  56. */
  57. log.Println(*sdir, *ddir, *ngo, *dbg)
  58. ch := make(chan string, 1)
  59. go func() {
  60. filepath.Walk(*sdir, func(path string, info os.FileInfo, err error) error {
  61. if err != nil {
  62. return err
  63. }
  64. if !info.IsDir() {
  65. ch <- path
  66. }
  67. return nil
  68. })
  69. close(ch)
  70. }()
  71. log.Fatal(run(*ddir, 1, ch)) //*ngo
  72. }
  73. func run(ddir string, n int, ch chan string) error {
  74. f := func(done chan bool) {
  75. for {
  76. path, ok := <-ch
  77. if !ok {
  78. done <- true
  79. break
  80. }
  81. // 解析文件
  82. name := filepath.Base(path)
  83. log.Println("beg parse::", name)
  84. ticks, err := parseFile(path)
  85. log.Println("end parse::", name)
  86. if err != nil {
  87. log.Println(err)
  88. continue
  89. }
  90. // 解析insId
  91. ex := "DL"
  92. for ex, _ = range exchangeMap {
  93. i := strings.Index(path, ex)
  94. if i != -1 {
  95. path = path[i:]
  96. break
  97. }
  98. }
  99. insId, err := parseInsId(path, name, ex)
  100. if err != nil {
  101. log.Println(err)
  102. }
  103. // 保存tick数据
  104. path = filepath.Join(ddir, market.Ctp)
  105. os.MkdirAll(path, os.ModePerm)
  106. //path = filepath.Join(path, name)
  107. log.Println("beg save::", name)
  108. _, err = market.SaveTickEx(path, ticks, insId, true)
  109. if err != nil {
  110. log.Println(err, path)
  111. continue
  112. }
  113. log.Println("end save::", name)
  114. // 保存K线数据
  115. log.Println("beg cand::", name)
  116. err = convAndSaveCandles(insId, ex, ticks)
  117. if err != nil {
  118. log.Println(err, path)
  119. }
  120. log.Println("end cand::", name)
  121. }
  122. }
  123. if n < 1 {
  124. n = runtime.NumCPU()
  125. }
  126. runtime.GOMAXPROCS(n)
  127. done := make(chan bool, n)
  128. for i := 0; i < n; i++ {
  129. go f(done)
  130. }
  131. for i := 0; i < n; i++ {
  132. <-done
  133. }
  134. return nil
  135. }
  136. func parseTime(stime string) (time.Time, error) {
  137. date := strings.Replace(stime, "/", "-", -1)
  138. tpl := "2006-1-2 15:04:05"
  139. if isZeroPad(stime) {
  140. tpl = "2006-01-02 15:04:05"
  141. }
  142. return time.Parse(tpl, date)
  143. }
  144. func isZeroPad(stime string) bool {
  145. date := strings.Split(stime, " ")
  146. if len(date) == 2 {
  147. return len(date[0]) == 10
  148. }
  149. return false
  150. }
  151. func parseFile(path string) ([]market.Tick, error) {
  152. if !strings.HasSuffix(path, "csv") {
  153. return nil, errors.New("history file data format error, must csv file " + path)
  154. }
  155. file, err := os.Open(path)
  156. if err != nil {
  157. return nil, err
  158. }
  159. defer file.Close()
  160. skipheader := true
  161. r := csv.NewReader(file)
  162. ticks := []market.Tick{}
  163. for {
  164. data, err := r.Read()
  165. if err == io.EOF {
  166. break
  167. }
  168. if err != nil {
  169. return nil, err
  170. }
  171. if skipheader {
  172. skipheader = false
  173. continue
  174. }
  175. t, err := parseTick(data)
  176. if err != nil {
  177. log.Println(err, path)
  178. continue
  179. }
  180. ticks = append(ticks, *t)
  181. }
  182. return ticks, nil
  183. }
  184. func convDate(st string) (*time.Time, error) {
  185. if len(st) < 6 {
  186. return nil, errors.New(st + " is error format. MUST yyyymmdd")
  187. }
  188. sy := st[:4]
  189. sm := st[4:6]
  190. sd := st[6:]
  191. y, _ := strconv.ParseInt(string(sy), 10, 64)
  192. m, _ := strconv.ParseInt(string(sm), 10, 64)
  193. d, _ := strconv.ParseInt(string(sd), 10, 64)
  194. t := time.Date(int(y), time.Month(m), int(d), 0, 0, 0, 0, time.Local)
  195. return &t, nil
  196. }
  197. func getInsId(ex, s string) string {
  198. insId := ""
  199. if ex == "DL" || ex == "SQ" {
  200. insId = strings.ToLower(s)
  201. } else {
  202. insId = strings.ToUpper(s)
  203. }
  204. return market.CtpPrefix + insId
  205. }
  206. func parseInsId(path, name, ex string) (string, error) {
  207. nameError := errors.New(name + " file name error. must xx_yyyymmdd.csv format")
  208. k := ex
  209. ss := strings.Split(name, "_") // xx_yyyymmdd.csv
  210. if len(ss) != 2 {
  211. return "", nameError
  212. }
  213. id := ss[0]
  214. if len(id) < 3 {
  215. return "", nameError
  216. }
  217. pid := id[:len(id)-2]
  218. if strings.HasSuffix(id, "MI") || strings.HasSuffix(id, "mi") { // 主力连续
  219. return getInsId(k, pid) + "MI", nil
  220. }
  221. sidt := id[len(id)-2:]
  222. idt, err := strconv.Atoi(sidt)
  223. if err != nil {
  224. return "", nameError
  225. }
  226. if idt > 12 { // 指标
  227. return getInsId(k, id), nil
  228. }
  229. ss = strings.Split(ss[1], ".") // yyyymmdd.csv
  230. if len(ss) != 2 {
  231. return "", nameError
  232. }
  233. t, err := convDate(ss[0])
  234. if err != nil {
  235. return "", nameError
  236. }
  237. y := t.Year()
  238. if strings.HasSuffix(pid, "x") || strings.HasSuffix(pid, "X") { // aX01 ==> a1601 x偶数年
  239. if y%2 == 0 {
  240. if int(t.Month()) > idt {
  241. y += 2
  242. }
  243. } else {
  244. y += 1
  245. }
  246. pid = pid[:len(pid)-1] // remove x
  247. } else if strings.HasSuffix(pid, "Y") || strings.HasSuffix(pid, "Y") { // aY01 ==> a1501 y奇数年
  248. if pid != "Y" || pid != "y" {
  249. if y%2 == 0 {
  250. y += 1
  251. } else {
  252. if int(t.Month()) > idt {
  253. y += 2
  254. }
  255. }
  256. pid = pid[:len(pid)-1] // remove y
  257. }
  258. } else if int(t.Month()) > idt {
  259. y += 1
  260. }
  261. sy := strconv.Itoa(y)
  262. insId := pid + sy[2:] + sidt // "a" + "16" + "01" = a1601
  263. return getInsId(k, insId), nil
  264. }
  265. // 日期 时间 成交价 成交量 总量 属性(持仓增减) B1价 B1量 B2价 B2量 B3价 B3量 S1价 S1量 S2价 S2量 S3价 S3量 BS
  266. func parseTick(data []string) (*market.Tick, error) {
  267. if len(data) < 13 {
  268. return nil, errors.New("len(data) < 13")
  269. }
  270. stime := data[0] + " " + data[1]
  271. t, err := parseTime(stime)
  272. if err != nil {
  273. return nil, err
  274. }
  275. price, err := strconv.ParseFloat(data[2], 64)
  276. if err != nil {
  277. return nil, err
  278. }
  279. vol, err := strconv.ParseFloat(data[3], 64)
  280. if err != nil {
  281. return nil, err
  282. }
  283. bidp1, err := strconv.ParseFloat(data[6], 64)
  284. if err != nil {
  285. return nil, err
  286. }
  287. bidv1, err := strconv.ParseFloat(data[7], 64)
  288. if err != nil {
  289. return nil, err
  290. }
  291. askpi, err := strconv.ParseFloat(data[12], 64)
  292. if err != nil {
  293. return nil, err
  294. }
  295. askv1, err := strconv.ParseFloat(data[13], 64)
  296. if err != nil {
  297. return nil, err
  298. }
  299. tick := &market.Tick{
  300. Timestamp: (t.Unix() - 3600*8) * 1000, // to utc time
  301. Price: price,
  302. Volume: vol,
  303. Bid: market.PP{bidp1, bidv1},
  304. Ask: market.PP{askpi, askv1},
  305. }
  306. return tick, nil
  307. }
  308. func convAndSaveCandles(insId, ex string, ticks []market.Tick) error {
  309. var candles []market.Candle
  310. pa := []int{market.M1, market.M5, market.H1, market.D1}
  311. for _, period := range pa {
  312. var err error
  313. if period == market.M1 {
  314. candles, err = convCandles0(ticks, insId, market.M1)
  315. } else {
  316. candles, err = convCandles1(candles, insId, period)
  317. }
  318. if err != nil {
  319. return err
  320. }
  321. newpath := filepath.Join(*ddir, market.Ctp)
  322. os.MkdirAll(newpath, os.ModePerm)
  323. if period == market.D1 {
  324. dir := path.Join(newpath, insId)
  325. os.MkdirAll(dir, 0777)
  326. var bname string
  327. bname = fmt.Sprintf("%s.gz", market.PeriodNameMap[period])
  328. fname := path.Join(dir, bname)
  329. candles, _ = combinEx(fname, candles)
  330. }
  331. _, err = market.SaveCandlesEx(newpath, insId, candles, period, true)
  332. if err != nil {
  333. return err
  334. }
  335. }
  336. return nil
  337. }
  338. func convCandles0(ticks []market.Tick, insId string, period int) ([]market.Candle, error) {
  339. r := market.NewTickBuf(ticks)
  340. return market.TickConvCandle(r, insId, period)
  341. }
  342. func convCandles1(candles []market.Candle, insId string, period int) ([]market.Candle, error) {
  343. r := market.NewCandleBuf(candles)
  344. return market.ConvPeriod(r, insId, period)
  345. }
  346. func combinEx(filename string, candles []market.Candle) ([]market.Candle, error) {
  347. buf, err := market.ReadCandleFile(filename)
  348. if err != nil {
  349. return candles, err
  350. }
  351. candles = append(buf, candles[:]...)
  352. sort.Sort(byTime(candles))
  353. return candles, nil
  354. }