readtick.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. package main
  2. //读取文件,生成 base.TickGo
  3. //Open(dir, instrumentId)
  4. //Read() (*base.TickGo, error)
  5. import "fmt"
  6. import "io"
  7. import "os"
  8. import "lmaxapi/base"
  9. import "log"
  10. import "strings"
  11. import "compress/gzip"
  12. import "encoding/csv"
  13. import "strconv"
  14. import "errors"
  15. import "lmaxapi/markinfo"
  16. import "flag"
  17. import "runtime/pprof"
  18. //import "runtime"
  19. import "time"
  20. type TickRead struct {
  21. tickch chan *base.TickGo
  22. errch chan error
  23. }
  24. func (tr *TickRead) Read() (*base.TickGo, error) {
  25. tick := <-tr.tickch
  26. if tick == nil {
  27. return nil, <-tr.errch
  28. }
  29. return tick, nil
  30. }
  31. type CandleRead struct {
  32. candlech chan *base.OhlcGo
  33. errch chan error
  34. }
  35. func (tr *CandleRead) Read() (*base.OhlcGo, error) {
  36. candle := <-tr.candlech
  37. if candle == nil {
  38. return nil, <-tr.errch
  39. }
  40. return candle, nil
  41. }
  42. var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
  43. func main() {
  44. //runtime.GOMAXPROCS(4)
  45. s := time.Now()
  46. defer func() {
  47. log.Println(time.Now().Sub(s))
  48. }()
  49. flag.Parse()
  50. if *cpuprofile != "" {
  51. f, err := os.Create(*cpuprofile)
  52. if err != nil {
  53. log.Fatal(err)
  54. }
  55. pprof.StartCPUProfile(f)
  56. defer func() {
  57. log.Println("stop pprof.")
  58. pprof.StopCPUProfile()
  59. }()
  60. }
  61. reader, err := m1Reader()
  62. if err != nil {
  63. log.Println(err)
  64. return
  65. }
  66. symbolId, err := markinfo.BookIdToSymbolId(4001)
  67. if err != nil {
  68. log.Println(err)
  69. return
  70. }
  71. //货币对
  72. //周期
  73. //初始K线(已经生成了部分k线, 在这个基础上继续生成)
  74. //时区,默认是从 GMT+0 到 GMT+0
  75. //数据源,支持两种接口:base.TickReader base.CandleReader
  76. gen, err := base.NewCandleGenerate(symbolId, base.H1, nil, nil, reader)
  77. if err != nil {
  78. log.Println(err)
  79. return
  80. }
  81. //一直读取
  82. var prev *base.OhlcGo
  83. for {
  84. //这里会输出所有的k线生成的过程,每次tick跳动,就会生成一个ohlc
  85. ohlc, err := gen.Read()
  86. if err == io.EOF {
  87. break
  88. }
  89. if err != nil {
  90. log.Println(err)
  91. break
  92. }
  93. if prev == nil {
  94. prev = ohlc
  95. }
  96. if prev.Time != ohlc.Time { //产生新的K线
  97. // log.Println(prev)
  98. }
  99. prev = ohlc
  100. }
  101. log.Println("end")
  102. }
  103. func m1Reader() (*CandleRead, error) {
  104. reader, err := OpenDir("./lmax", 4001)
  105. if err != nil {
  106. return nil, err
  107. }
  108. symbolId, err := markinfo.BookIdToSymbolId(4001)
  109. if err != nil {
  110. return nil, err
  111. }
  112. //货币对
  113. //周期
  114. //初始K线(已经生成了部分k线, 在这个基础上继续生成)
  115. //时区,默认是从 GMT+0 到 GMT+0
  116. //数据源,支持两种接口:base.TickReader base.CandleReader
  117. gen, err := base.NewCandleGenerate(symbolId, base.S15, nil, nil, reader)
  118. if err != nil {
  119. return nil, err
  120. }
  121. ch := make(chan *base.OhlcGo, 1024)
  122. errch := make(chan error)
  123. reader2 := &CandleRead{}
  124. reader2.candlech = ch
  125. reader2.errch = errch
  126. //一直读取
  127. go func() {
  128. var prev *base.OhlcGo
  129. for {
  130. //这里会输出所有的k线生成的过程,每次tick跳动,就会生成一个ohlc
  131. ohlc, err := gen.Read()
  132. if err == io.EOF {
  133. break
  134. }
  135. if err != nil {
  136. log.Println(err)
  137. break
  138. }
  139. if prev == nil {
  140. prev = ohlc
  141. reader2.candlech <- ohlc
  142. }
  143. if prev.Time != ohlc.Time { //产生新的K线
  144. t := time.Unix(int64(prev.Time), 0)
  145. log.Println(t, prev.Open == prev.Close, prev.High == prev.Low, prev.RealVolumn, prev.TickVolumn)
  146. reader2.candlech <- prev
  147. }
  148. prev = ohlc
  149. }
  150. // log.Println("end:", prev)
  151. }()
  152. return reader2, nil
  153. }
  154. func OpenDir(dir string, instrumentId int64) (*TickRead, error) {
  155. files, err := getfilelist(dir+"/marketdata/orderbook/"+fmt.Sprint(instrumentId), ".csv.gz")
  156. if err != nil {
  157. return nil, err
  158. }
  159. ch := make(chan *base.TickGo, 1)
  160. errch := make(chan error)
  161. reader := &TickRead{}
  162. reader.tickch = ch
  163. reader.errch = errch
  164. i := 0
  165. go func() {
  166. for _, file := range files {
  167. err := readGzipCsv(file, instrumentId, func(ti *base.TickGo) {
  168. t := time.Unix(int64(ti.Time), 0)
  169. log.Println("@@@", t, ti.Ask, ti.Bid, ti.Askv, ti.Bidv)
  170. ch <- ti
  171. })
  172. i++
  173. if i == 20 {
  174. break
  175. }
  176. if err == nil || err == io.EOF {
  177. continue
  178. }
  179. ch <- nil
  180. errch <- err
  181. break
  182. }
  183. //发送结束标记
  184. ch <- nil
  185. errch <- io.EOF
  186. }()
  187. return reader, nil
  188. }
  189. //采用广度优先算法,遍历文件
  190. func getfilelist(dir string, ext string) ([]string, error) {
  191. queue := make([]string, 0)
  192. files := make([]string, 0)
  193. queue = append(queue, dir)
  194. for len(queue) > 0 {
  195. top := queue[0]
  196. queue = queue[1:]
  197. //readdir
  198. handle, err := os.Open(top)
  199. if err != nil {
  200. return nil, err
  201. }
  202. defer handle.Close()
  203. fis, err := handle.Readdir(0)
  204. if err != nil {
  205. return nil, err
  206. }
  207. for _, fi := range fis {
  208. path := top + "/" + fi.Name()
  209. if fi.IsDir() {
  210. queue = append(queue, path)
  211. } else if strings.HasSuffix(path, ext) {
  212. files = append(files, path)
  213. }
  214. }
  215. }
  216. return files, nil
  217. }
  218. func readGzipCsv(file string, instrumentId int64, cb func(*base.TickGo)) error {
  219. handle, err := os.Open(file)
  220. if err != nil {
  221. return err
  222. }
  223. defer handle.Close()
  224. reader, err := gzip.NewReader(handle)
  225. if err != nil {
  226. return err
  227. }
  228. defer reader.Close()
  229. csvreader := csv.NewReader(reader)
  230. log.Println(file)
  231. first := true
  232. for {
  233. data, err := csvreader.Read()
  234. if err == io.EOF {
  235. return nil
  236. }
  237. if err != nil {
  238. continue
  239. }
  240. if first {
  241. first = false
  242. continue
  243. }
  244. if len(data) > 1 && data[1] == "" {
  245. continue
  246. }
  247. //第一行 || 空行 || 不合格的数据
  248. if data[0] == "" || len(data) != 5 {
  249. continue
  250. }
  251. //处理数据
  252. ti, err := toTickGo(data, int(instrumentId))
  253. if err != nil {
  254. return err
  255. }
  256. cb(ti)
  257. }
  258. return nil
  259. }
  260. func toTickGo(data []string, instrumentId int) (*base.TickGo, error) {
  261. tick := &base.TickGo{}
  262. if len(data) != 5 {
  263. return nil, errors.New("error data format.")
  264. }
  265. symbolId, err := markinfo.BookIdToSymbolId(instrumentId)
  266. if err != nil {
  267. return nil, errors.New("error instrumentId.")
  268. }
  269. timestamp, err := strconv.ParseInt(data[0], 10, 64)
  270. if err != nil {
  271. return nil, err
  272. }
  273. tick.Symbol = int16(symbolId)
  274. tick.Time = int32(timestamp / 1000)
  275. tick.Ms = int16(timestamp % 1000)
  276. bid, err := strconv.ParseFloat(data[1], 64)
  277. if err != nil {
  278. return nil, err
  279. }
  280. bidv, err := strconv.ParseFloat(data[2], 64)
  281. if err != nil {
  282. return nil, err
  283. }
  284. ask, err := strconv.ParseFloat(data[3], 64)
  285. if err != nil {
  286. return nil, err
  287. }
  288. askv, err := strconv.ParseFloat(data[4], 64)
  289. if err != nil {
  290. return nil, err
  291. }
  292. tick.Bid = float32(bid)
  293. tick.Ask = float32(ask)
  294. tick.Bidv = int32(bidv)
  295. tick.Askv = int32(askv)
  296. return tick, nil
  297. }