lmax_his.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974
  1. // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved.
  2. package tick
  3. // 本文件实现lmax下载的历史行情数据的下载和解析,周期转换和保存
  4. import (
  5. "compress/gzip"
  6. "encoding/csv"
  7. "errors"
  8. "fmt"
  9. "io"
  10. "log"
  11. "net/url"
  12. "os"
  13. "path"
  14. "sort"
  15. "strconv"
  16. "strings"
  17. "sync"
  18. "time"
  19. "tickserver/api/lmaxapi"
  20. "tickserver/api/lmaxapi/request"
  21. "tickserver/api/lmaxapi/response"
  22. "tickserver/framework/event"
  23. "tickserver/server/market"
  24. )
  25. var lmaxUser = "wave2907"
  26. var lmaxPassWord = "Tg417396"
  27. var lmaxUrl = "https://trade.lmaxtrader.com"
  28. var lmaxInsTable = "Lmax_inss"
  29. var lmaxSaveDir string
  30. var lmaxDB *market.MyDB
  31. var insMap map[string]*market.Instrument
  32. var errcount int64
  33. var lmaxHisDownCh = make(chan int, 1)
  34. var lmaxHisParseCh = make(chan int, 1)
  35. var lmaxInsStartMap = make(map[string]map[int]int64)
  36. //var lmaxInsPublisher event.EventPublisher // 实时行情数据发布器
  37. var lmaxInsPublisher2 event.EventPublisher // 实时行情数据发布器
  38. type byTime []market.Candle
  39. func (a byTime) Len() int { return len(a) }
  40. func (a byTime) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  41. func (a byTime) Less(i, j int) bool { return a[i].Timestamp < a[j].Timestamp }
  42. // 设计: 4个步骤: 订阅==>下载==>解析==>保存
  43. // 1, 单个账号轮询下载所有的instrument
  44. // 2, 分别轮询tick, M1和D1
  45. // 3, 保证订阅返回文件列表的有序
  46. // 4, 可同时下载几组返回的订阅列表
  47. // 5, 每个返回的订阅文件列表依次下载, 不允许下载失败(失败则一直循环请求下载)
  48. // 6, 每组列表下载完毕(全部下载完成), 去解析, 解析的文件列表保证时序和完整
  49. // 7, 可以同时解析多组文件(每组文件)
  50. // 8, 对日线和小时线数据合并==>一个产品只有一个日线文件, 一个产品每月一个小时线文件
  51. // 9, 历史数据==>binary==>gzip编码压缩保存
  52. // 10, 把每个文件的信息写入数据库(refer, 路径, 岂止时间和K线数量)
  53. func doErrCode(s *lmaxapi.Session, err error) {
  54. //1. 处理下面的事件
  55. //1.1 网络中断:如果是交易,可能需要重新登录,以获取最新订单状态,如果是行情,可能不需要重启
  56. //1.2 session 过期,发生403错误,必须重新登录
  57. //1.3 heartbeat 心跳无响应: Op=stream err=heart beart timeout, Code=-1
  58. // 这个时候调用 session.StreamClose() 重新启动stream
  59. operr, ok := err.(*lmaxapi.OpError)
  60. log.Println("operr:", operr)
  61. if !ok {
  62. return
  63. }
  64. //1.2
  65. if operr.Code == 403 {
  66. log.Println("stop session")
  67. s.Stop()
  68. return
  69. }
  70. //1.1 and 1.3
  71. //stream 中发生错误,重启stream, 如果是交易,可能要选择重启session
  72. if operr.Op == "Stream" {
  73. log.Println("stop stream")
  74. if operr.Code == 0 {
  75. time.Sleep(1 * time.Second)
  76. }
  77. s.StopStream()
  78. // ss.Stop()
  79. return
  80. }
  81. }
  82. func regStreamError(ss *lmaxapi.Session) {
  83. // 注册数据流失败处理函数
  84. ss.RegisterStreamFailureEvent(func(s *lmaxapi.Session, err error) {
  85. doErrCode(s, err)
  86. })
  87. }
  88. func onInstrument(value *response.Instrument) {
  89. ins := &market.Instrument{
  90. Id: market.LmaxPrefix + strconv.Itoa(int(value.Id)),
  91. Name: value.Name,
  92. ExId: market.Lmax,
  93. Typ: market.Forex,
  94. PriceInc: value.PriceIncrement,
  95. Margin: value.MarginRate,
  96. StartTime: value.StartTime.Unix() * 1000, // ms
  97. }
  98. insMap[ins.Id] = ins
  99. // log.Println(ins.Id, ins.Name, ins.ExId)
  100. //lmaxInsPublisher.Publish(ins)
  101. // lds.insPublisher2.Publish(ins)
  102. lmaxDB.InsertIns(ins, lmaxInsTable)
  103. }
  104. func login(name, password, typ, url string) (*lmaxapi.Session, error) {
  105. log.Println("LmaxDS.login:", name)
  106. lmaxapi.SetBaseUrl(url)
  107. req := request.NewLoginRequest(name, password, typ)
  108. ss, err := lmaxapi.Login(req)
  109. if err != nil {
  110. s := err.Error()
  111. if len(s) > 32 {
  112. s = s[:32]
  113. }
  114. log.Println("lmaxapi.Login error:", s)
  115. return nil, err
  116. }
  117. log.Println(name, "login OK!")
  118. return ss, nil
  119. }
  120. func DownloadLmax(dir string, db *market.MyDB) {
  121. var chkMu sync.Mutex
  122. chkSS := false
  123. insMap = make(map[string]*market.Instrument)
  124. lmaxDB = db
  125. lmaxSaveDir = dir
  126. //此 goroutine用来触发下载历史数据, 每分钟下载一个
  127. go func() {
  128. //insMap := make(map[string]*market.Instrument)
  129. //lmaxInsPublisher.Event().Attach(func(v interface{}) error {
  130. //ins := v.(*market.Instrument)
  131. //insMap[ins.Id] = ins
  132. //return nil
  133. //})
  134. for {
  135. <-lmaxHisDownCh
  136. <-lmaxHisParseCh
  137. for _, ins := range insMap {
  138. //if ins.Id != "lmax_4001" {
  139. //continue
  140. //}
  141. //log.Println("ins.Id", ins.Id)
  142. chkMu.Lock()
  143. if !chkSS {
  144. time.Sleep(time.Minute * 10) // 防止不断重复发送下载请求
  145. }
  146. chkMu.Unlock()
  147. //if ins.Id == "lmax_4001" {
  148. lmaxInsPublisher2.Publish(ins)
  149. //}
  150. time.Sleep(time.Second * 10)
  151. }
  152. time.Sleep(time.Second * 10)
  153. }
  154. }()
  155. fn := func() {
  156. for {
  157. chkMu.Lock()
  158. chkSS = false
  159. chkMu.Unlock()
  160. // 登录产生新的Session
  161. ss, err := login(lmaxUser, lmaxPassWord, request.ProductType.CFD_LIVE, lmaxUrl)
  162. if err != nil {
  163. time.Sleep(time.Second * 10)
  164. continue
  165. }
  166. ss.RegisterSessionDisconnected(func(s *lmaxapi.Session) {
  167. log.Println("session disconnected. STOP session!")
  168. s.Stop()
  169. })
  170. regStreamError(ss)
  171. downloadRun(ss) // 历史数据
  172. ss.LoadAllInstruments(func(value *response.Instrument) {
  173. onInstrument(value)
  174. //ss.Subscribe(request.NewOrderBookSubscriptionRequest(value.Id), DefaultSubscribeCB)
  175. })
  176. ss.Wait()
  177. // 设置心跳
  178. ss.HeartbeatTimeout(5 * time.Second)
  179. // 会话启动
  180. chkMu.Lock()
  181. chkSS = true
  182. chkMu.Unlock()
  183. ss.Start()
  184. // 有错误时跳出Start重启
  185. log.Println("lmax session RESTART !!!")
  186. }
  187. }
  188. fn()
  189. }
  190. func ifDuplicateDownload(insId string, period int, start int64) bool {
  191. stmap, ok := lmaxInsStartMap[insId]
  192. if ok {
  193. st, ok1 := stmap[period]
  194. if ok1 {
  195. if st == start {
  196. return true
  197. } else {
  198. lmaxInsStartMap[insId][period] = start
  199. return false
  200. }
  201. } else {
  202. lmaxInsStartMap[insId][period] = start
  203. return false
  204. }
  205. } else {
  206. lmaxInsStartMap[insId] = make(map[int]int64)
  207. lmaxInsStartMap[insId][period] = start
  208. return false
  209. }
  210. }
  211. func downloadRun(ss *lmaxapi.Session) {
  212. dlParser := newParser(1) // 解析器
  213. dlTask := newDownTask(lmaxDB, dlParser, lmaxSaveDir, "", 1) // 下载任务器
  214. ss.RegisterHistoricMarketDataEvent(func(s *lmaxapi.Session, event *response.HistoricMarketDataEvent) {
  215. if len(event.Urls) == 0 {
  216. log.Println("event.Urls:", len(event.Urls))
  217. return
  218. }
  219. //for i, v := range event.Urls {
  220. //log.Println("url:", i, v)
  221. //}
  222. dlTask.downloadUrls(ss, event.Urls)
  223. })
  224. lmaxInsPublisher2.Event().Attach(func(v interface{}) error {
  225. ins := v.(*market.Instrument)
  226. //insId := market.LmaxPrefix + strconv.Itoa(int(4001))
  227. //if ins.Id != insId {
  228. //return nil
  229. //}
  230. lmaxId := market.RealInsId(ins.Id)
  231. ps := []int{0, market.M1, market.D1}
  232. for _, period := range ps {
  233. downloadInsHis(ss, ins.Id, lmaxId, period)
  234. }
  235. return nil
  236. })
  237. }
  238. func downloadInsHis(ss *lmaxapi.Session, insId, lmaxId string, period int) {
  239. // log.Println("downloadInsHis", insId, lmaxId, period)
  240. switch period {
  241. case 0:
  242. downloadInsTicks(ss, insId, lmaxId)
  243. default:
  244. downloadInsCandles(ss, insId, lmaxId, period)
  245. }
  246. }
  247. // downloadInsTicks 下载tick数据
  248. // update == true 指示是否下载最新数据
  249. func downloadInsTicks(ss *lmaxapi.Session, insId, lmaxId string) {
  250. st := insMap[insId].StartTime / 1000
  251. //if market.Debug {
  252. //st = time.Date(2015, 1, 1, 0, 0, 0, 0, time.Local).Unix()
  253. //}
  254. //r, err := lds.db.GetLastTime(insId, 0)
  255. //if err == nil && r != nil {
  256. //log.Println("downloadInsTicks", insId, err, st, r.StartTime)
  257. //st = r.StartTime / 1000
  258. //}
  259. startTime, err := lmaxDB.GetInsStartTime(lmaxInsTable, insId, 0)
  260. startTime /= 1000
  261. log.Println("downloadInsTicks", insId, err, st, startTime)
  262. if err == nil && startTime > st {
  263. st = startTime
  264. }
  265. //if time.Now().Unix()-st < int64(market.D1) { //market.M1
  266. //return
  267. //}
  268. if ifDuplicateDownload(insId, 0, st) {
  269. return
  270. }
  271. log.Println("downloadInsTicks:", insId, lmaxId, st)
  272. iid, _ := strconv.Atoi(lmaxId)
  273. req := request.NewHistoricSubscriptionRequest()
  274. ss.Subscribe(req, func(err error) {
  275. if err != nil {
  276. // log.Println("downloadInsTicks Subscribe history request error:", err)
  277. doErrCode(ss, err) // 错误处理
  278. return
  279. }
  280. hreq := request.NewTopOfBookHistoricRequest(lmaxId, int64(iid), time.Unix(st, 0), time.Now(), "CSV")
  281. log.Println(lmaxId, int64(iid), time.Unix(st, 0), time.Now(), "CSV")
  282. ss.RequestHistoricMarketData(hreq, func(err error) {
  283. if err != nil {
  284. log.Println("downloadInsTicks RequestHistoricMarketData error", err)
  285. }
  286. })
  287. return
  288. })
  289. }
  290. // downloadInsCandles 下载历史K线数据
  291. // update == true 指示是否下载最新数据
  292. func downloadInsCandles(ss *lmaxapi.Session, insId, lmaxId string, period int) {
  293. ps := "MINUTE"
  294. if period == market.D1 {
  295. ps = "DAY"
  296. }
  297. st := insMap[insId].StartTime / 1000
  298. //if market.Debug {
  299. //st = time.Date(2015, 1, 1, 0, 0, 0, 0, time.Local).Unix()
  300. //}
  301. //r, err := lds.db.GetLastTime(insId, period)
  302. //if err == nil && r != nil {
  303. //log.Println("downloadInsCandles", insId, err, st, r.StartTime, r.EndTime, period)
  304. //st = r.EndTime / 1000
  305. //if period == market.M1 {
  306. //st = r.StartTime / 1000
  307. //}
  308. //}
  309. startTime, err := lmaxDB.GetInsStartTime(lmaxInsTable, insId, period)
  310. startTime /= 1000
  311. log.Println("downloadInsCandles", insId, err, st, startTime, period)
  312. if err == nil && startTime > st {
  313. st = startTime
  314. }
  315. //if time.Now().Unix()-st < market.D1 { //int64(period)
  316. //return
  317. //}
  318. if ifDuplicateDownload(insId, period, st) {
  319. return
  320. }
  321. log.Println("lds.downloadInsCandles", insId, lmaxId, ps, st)
  322. iid, _ := strconv.Atoi(lmaxId)
  323. req := request.NewHistoricSubscriptionRequest()
  324. ss.Subscribe(req, func(err error) {
  325. if err != nil {
  326. // log.Println("downloadInsCandles Subscribe error:", err)
  327. doErrCode(ss, err) // 错误处理
  328. return
  329. }
  330. hreq := request.NewAggregateHistoricRequest(lmaxId, int64(iid), time.Unix(st, 0), time.Now(), "CSV",
  331. ps, []string{"BID"}) // 使用卖价
  332. log.Println(lmaxId, int64(iid), time.Unix(st, 0), time.Now(), "CSV", ps, []string{"BID"})
  333. ss.RequestHistoricMarketData(hreq, func(err error) {
  334. if err != nil {
  335. log.Println("downloadInsCandles RequestHistoricMarketData error:", err)
  336. }
  337. })
  338. })
  339. }
  340. type dlWork struct {
  341. url string
  342. ss *lmaxapi.Session
  343. }
  344. // 下载任务
  345. type downTask struct {
  346. in chan []*dlWork
  347. db *market.MyDB
  348. p *parser
  349. dirMu sync.Mutex
  350. dir string
  351. dir1 string
  352. mu sync.Mutex
  353. wss [][]*dlWork
  354. }
  355. func (task *downTask) addws(ws []*dlWork) bool {
  356. task.mu.Lock()
  357. defer task.mu.Unlock()
  358. for _, s := range task.wss {
  359. for _, w := range s {
  360. if w.url == ws[0].url {
  361. _, _, _, err := task.parseUrl(w.url)
  362. if err != nil {
  363. return false
  364. }
  365. //r, err := task.db.GetLastTime(insId, period)
  366. //if err != nil {
  367. //return false
  368. //}
  369. //if r != nil && fname == r.Refer {
  370. //// log.Println("download newest url:", w.url)
  371. //return true
  372. //}
  373. return false
  374. }
  375. }
  376. }
  377. task.wss = append(task.wss, ws)
  378. return true
  379. }
  380. // downloadUrls 下载一种instrument的url列表
  381. // url列表是以时间顺序的, 下载完毕后, 要保证顺序给到解析器解析
  382. func (task *downTask) downloadUrls(ss *lmaxapi.Session, urls []string) {
  383. ws := make([]*dlWork, len(urls))
  384. for i := 0; i < len(urls); i++ {
  385. w := &dlWork{urls[i], ss}
  386. ws[i] = w
  387. }
  388. //if task.addws(ws) {
  389. task.in <- ws
  390. //}
  391. }
  392. func newDownTask(db *market.MyDB, p *parser, dir, dir1 string, max int) *downTask {
  393. task := &downTask{
  394. in: make(chan []*dlWork, 1024),
  395. db: db,
  396. p: p,
  397. dir: dir,
  398. dir1: dir1,
  399. }
  400. // 并发下载
  401. for i := 0; i < max; i++ {
  402. go task.dowork()
  403. }
  404. return task
  405. }
  406. func (task *downTask) dowork() {
  407. for {
  408. var ws []*dlWork
  409. select {
  410. case ws = <-task.in:
  411. default:
  412. select {
  413. case lmaxHisDownCh <- 1:
  414. default:
  415. }
  416. time.Sleep(time.Second * 1)
  417. continue
  418. }
  419. //ws := <-task.in
  420. //pws := []*parseWork{}
  421. wCount := len(ws)
  422. //pwCh := make(chan *parseWork, wCount)
  423. //errCh := make(chan error, wCount)
  424. resultCh := make(chan int, wCount)
  425. wsCh := make(chan *dlWork, 1)
  426. log.Println("download begin")
  427. go func() {
  428. for _, w := range ws {
  429. wsCh <- w
  430. }
  431. }()
  432. for index := 0; index < 4; index++ {
  433. go func() {
  434. for {
  435. w := <-wsCh
  436. i := 0
  437. for {
  438. log.Println("down:", w.url)
  439. pw, err := task.downloadOne(w.ss, w.url)
  440. if err != nil {
  441. log.Println("down error:", err, w.url)
  442. i++
  443. if i == 3 {
  444. resultCh <- 0
  445. //errCh <- err
  446. break
  447. }
  448. continue
  449. }
  450. var print_url, print_fname string
  451. if w != nil {
  452. print_url = w.url
  453. }
  454. if pw != nil {
  455. print_fname = pw.fname
  456. }
  457. log.Println("down ok:", print_url, print_fname)
  458. //pwCh <- pw
  459. if pw != nil {
  460. pws := []*parseWork{}
  461. pws = append(pws, pw)
  462. task.p.parse(pws)
  463. pws = nil
  464. }
  465. resultCh <- 1
  466. break
  467. }
  468. }
  469. }()
  470. }
  471. //bErr := false
  472. //var pw *parseWork
  473. //var err error
  474. for count := 0; count < len(ws); count++ {
  475. <-resultCh
  476. //select {
  477. //case pw = <-pwCh:
  478. //if pw != nil {
  479. //pws = append(pws, pw)
  480. //}
  481. //case err = <-errCh:
  482. //bErr = true
  483. //}
  484. }
  485. //close(wsCh)
  486. log.Println("download end")
  487. //if bErr {
  488. //log.Println("downloadOne error:", err, len(pws), len(ws))
  489. ////return
  490. //}
  491. // 解析一组文件
  492. //task.p.parse(pws)
  493. }
  494. }
  495. /*func (task *downTask) dowork() {
  496. for {
  497. ws := <-task.in
  498. pws := []*parseWork{}
  499. for _, w := range ws {
  500. i := 0
  501. for {
  502. pw, err := task.downloadOne(w.ss, w.url)
  503. if err != nil {
  504. i++
  505. if i == 3 {
  506. log.Printf("downloadOne error:", err)
  507. return
  508. }
  509. continue
  510. }
  511. if pw != nil {
  512. pws = append(pws, pw)
  513. }
  514. break
  515. }
  516. }
  517. // 解析一组文件
  518. task.p.parse(pws)
  519. }
  520. }*/
  521. func (task *downTask) parseUrl(u string) (fname, insId string, period int, e error) {
  522. surl, err := url.Parse(u)
  523. if err != nil {
  524. e = err
  525. return
  526. }
  527. bname := path.Base(surl.Path)
  528. ss := strings.Split(bname, "-")
  529. if len(ss) < 8 {
  530. e = errors.New("Url is error:")
  531. return
  532. }
  533. // 根据url文件名解析出insId和period
  534. insId = market.LmaxPrefix + ss[7]
  535. if strings.Contains(bname, "tick") {
  536. period = 0
  537. } else if strings.Contains(bname, "minute") {
  538. period = market.M1
  539. } else if strings.Contains(bname, "day") {
  540. period = market.D1
  541. } else {
  542. log.Println("task.downloadOne error: url is error:", u)
  543. }
  544. fname = task.dir + surl.Path
  545. return
  546. }
  547. func (task *downTask) downloadOne(session *lmaxapi.Session, u string) (*parseWork, error) {
  548. fname, insId, period, err := task.parseUrl(u)
  549. if err != nil {
  550. return nil, err
  551. }
  552. if _, err := os.Stat(fname); !os.IsNotExist(err) {
  553. return &parseWork{fname, insId, period}, nil //return nil, nil
  554. // 文件存在
  555. //r, err := task.db.GetLastTime(insId, period)
  556. //if err != nil {
  557. //return &parseWork{fname, insId, period}, nil
  558. //}
  559. //if r != nil && fname != r.Refer {
  560. //// 并不是最新的文件, 那么无需下载
  561. //return nil, nil
  562. //}
  563. }
  564. resp, err := session.OpenUrl(u, nil)
  565. if err != nil {
  566. return nil, err
  567. }
  568. defer resp.Body.Close()
  569. // log.Println(u)
  570. // 保存到本地
  571. fpath := path.Dir(fname)
  572. os.MkdirAll(fpath, 0777)
  573. w, err := os.Create(fname)
  574. if err != nil {
  575. return nil, err
  576. }
  577. n, err := io.Copy(w, resp.Body)
  578. w.Close()
  579. if err != nil {
  580. if err != io.EOF {
  581. return nil, err
  582. }
  583. }
  584. if n == 0 {
  585. return nil, errors.New("io.Copy write 0 byte: " + u)
  586. }
  587. return &parseWork{fname, insId, period}, nil
  588. }
  589. type parseWork struct {
  590. fname string
  591. insId string
  592. period int
  593. }
  594. // 解析器
  595. type parser struct {
  596. pwch chan []*parseWork // 缓存解析任务
  597. mu sync.Mutex
  598. pwss [][]*parseWork
  599. }
  600. func newParser(max int) *parser {
  601. fch := make(chan []*parseWork, 32)
  602. p := &parser{pwch: fch}
  603. // 指定数量的并发解析
  604. for i := 0; i < max; i++ {
  605. go p.doParse()
  606. }
  607. return p
  608. }
  609. // 避免重复解析
  610. func (p *parser) addpws(pws []*parseWork) bool {
  611. p.mu.Lock()
  612. defer p.mu.Unlock()
  613. for _, ws := range p.pwss {
  614. for _, pw := range ws {
  615. if pws[0].fname == pw.fname {
  616. //r, err := lmaxDB.GetLastTime(pw.insId, pw.period)
  617. //if err != nil {
  618. //return false
  619. //}
  620. //if r != nil && pw.fname == r.Refer {
  621. //return true
  622. //}
  623. return false
  624. }
  625. }
  626. }
  627. p.pwss = append(p.pwss, pws)
  628. return true
  629. }
  630. func (p *parser) parse(pws []*parseWork) {
  631. if len(pws) == 0 {
  632. return
  633. }
  634. //if p.addpws(pws) {
  635. p.pwch <- pws
  636. //}
  637. }
  638. func (p *parser) doParse() {
  639. for {
  640. var pws []*parseWork
  641. select {
  642. case pws = <-p.pwch:
  643. default:
  644. select {
  645. case lmaxHisParseCh <- 1:
  646. default:
  647. }
  648. time.Sleep(time.Second * 1)
  649. continue
  650. }
  651. //pws := <-p.pwch
  652. for _, w := range pws {
  653. log.Println("parse:", w.fname)
  654. ticks, candles, err := parseFile(w.fname, w.period)
  655. if err != nil {
  656. log.Println("lmax parseFile error:", err, w.fname)
  657. badname := w.fname + ".bad"
  658. os.Remove(badname)
  659. os.Rename(w.fname, badname)
  660. continue
  661. }
  662. log.Println("save:", w.fname)
  663. err = p.doSave(w.fname, w.insId, w.period, ticks, candles)
  664. if err != nil {
  665. log.Println("lmax doSave error:", err, w.fname)
  666. }
  667. }
  668. }
  669. }
  670. // parseFile 解析文件, refer表明文件来源
  671. func parseFile(refer string, period int) ([]market.Tick, []market.Candle, error) {
  672. // 解析为[]Tick或者[]Candle
  673. f, err := os.Open(refer)
  674. if err != nil {
  675. return nil, nil, errors.New("parseFile error: " + err.Error())
  676. }
  677. defer f.Close()
  678. return parse(f, period)
  679. }
  680. func ParseFile(refer string, period int) ([]market.Tick, []market.Candle, error) {
  681. return parseFile(refer, period)
  682. }
  683. func doConv(candles []market.Candle, insId string, period int) ([]market.Candle, error) {
  684. r := market.NewCandleBuf(candles)
  685. buf, err := market.ConvPeriod(r, insId, period)
  686. if err != nil {
  687. log.Println(err)
  688. }
  689. if len(buf) == 0 {
  690. return nil, errors.New("doConv error: result buf len is ZERO")
  691. }
  692. return buf, nil
  693. }
  694. func (p *parser) doSave(refer, insId string, period int, ticks []market.Tick, candles []market.Candle) error {
  695. // 保存到本地
  696. //for i := 0; i < len(ticks); i++ {
  697. //log.Println(ticks[i].Timestamp)
  698. //}
  699. if period == 0 { // tick
  700. if len(ticks) == 0 {
  701. return io.ErrUnexpectedEOF
  702. }
  703. fname := strings.Replace(refer, "csv.gz", "TK", -1)
  704. //err := market.SaveTicks(lmaxDB, refer, fname, ticks, insId)
  705. _, err := market.SaveTickEx(lmaxSaveDir, ticks, insId, true)
  706. if err != nil {
  707. return err
  708. }
  709. lmaxDB.UpdateInsStartTime(lmaxInsTable, insId, ticks[len(ticks)-1].Timestamp, 0)
  710. log.Println("doSave", insId, ticks[0].Timestamp, 0, fname)
  711. } else { // candle
  712. pname := market.PeriodNameMap[period]
  713. fname := strings.Replace(refer, "csv.gz", pname, -1)
  714. if len(candles) == 0 {
  715. return io.ErrUnexpectedEOF
  716. }
  717. if period == market.D1 {
  718. bname := path.Base(refer)
  719. ss := strings.Split(bname, "-")
  720. if len(ss) < 8 {
  721. log.Println(bname)
  722. return errors.New("saveD1 error: " + fname)
  723. }
  724. dir := path.Dir(refer)
  725. for ; path.Base(dir) != ss[7]; dir = path.Dir(dir) {
  726. }
  727. fname = path.Join(dir, "candle.D1")
  728. dir = path.Join(lmaxSaveDir, insId)
  729. os.MkdirAll(dir, 0777)
  730. bname = fmt.Sprintf("%s.gz", market.PeriodNameMap[period])
  731. fname = path.Join(dir, bname)
  732. candles, _ = combinEx(fname, candles)
  733. //err := market.SaveH1OrD1(lmaxDB, refer, fname, candles, insId, market.D1)
  734. _, err := market.SaveCandlesEx(lmaxSaveDir, insId, candles, market.D1, true)
  735. if err != nil {
  736. return err
  737. }
  738. lmaxDB.UpdateInsStartTime(lmaxInsTable, insId, candles[len(candles)-1].Timestamp, period)
  739. log.Println("doSave", insId, candles[0].Timestamp, period)
  740. return nil
  741. }
  742. //err := market.SaveCandles(lmaxDB, refer, fname, candles, insId, period)
  743. _, err := market.SaveCandlesEx(lmaxSaveDir, insId, candles, period, true)
  744. if err != nil {
  745. return err
  746. }
  747. lmaxDB.UpdateInsStartTime(lmaxInsTable, insId, candles[len(candles)-1].Timestamp, period)
  748. log.Println("doSave", insId, candles[0].Timestamp, period)
  749. // 转换周期
  750. if period == market.M1 {
  751. buf, err := doConv(candles, insId, market.M5)
  752. if err != nil {
  753. return err
  754. }
  755. //refer := fname
  756. fname = strings.Replace(fname, "M1", "M5", -1)
  757. //err = market.SaveCandles(lmaxDB, refer, fname, buf, insId, market.M5)
  758. _, err = market.SaveCandlesEx(lmaxSaveDir, insId, buf, market.M5, true)
  759. if err != nil {
  760. return err
  761. }
  762. // log.Println("H1:", fname)
  763. buf, err = doConv(buf, insId, market.H1)
  764. if err != nil {
  765. return err
  766. }
  767. fname = path.Join(path.Dir(fname), "candle.H1")
  768. //return market.SaveH1OrD1(lmaxDB, refer, fname, buf, insId, market.H1)
  769. _, err = market.SaveCandlesEx(lmaxSaveDir, insId, buf, market.H1, true)
  770. return err
  771. }
  772. }
  773. return nil
  774. }
  775. // parse 从r读取所有的数据, 解析为[]tick或者[]Candle
  776. func parse(r io.Reader, period int) ([]market.Tick, []market.Candle, error) {
  777. zipr, err := gzip.NewReader(r)
  778. if err != nil {
  779. return nil, nil, err
  780. }
  781. candles := []market.Candle{}
  782. ticks := []market.Tick{}
  783. first := true
  784. csvr := csv.NewReader(zipr)
  785. for {
  786. data, err := csvr.Read()
  787. if err != nil {
  788. if err != io.EOF {
  789. return nil, nil, err
  790. }
  791. break
  792. }
  793. if first {
  794. first = false
  795. continue
  796. }
  797. if len(data) > 0 && data[0] == "" {
  798. continue
  799. }
  800. if len(data) > 1 && data[1] == "" {
  801. continue
  802. }
  803. if period == 0 {
  804. tick, e := parseTick(data)
  805. if e != nil {
  806. log.Println(e, data)
  807. continue
  808. }
  809. ticks = append(ticks, *tick)
  810. } else {
  811. candle, e := parseCandle(data)
  812. if e != nil {
  813. log.Println(e, data)
  814. continue
  815. }
  816. candles = append(candles, *candle)
  817. }
  818. }
  819. return ticks, candles, nil
  820. }
  821. // parseTick解析一行csv数据为一个tick
  822. func parseTick(data []string) (*market.Tick, error) {
  823. tick := market.Tick{}
  824. if len(data) != 5 {
  825. return nil, errors.New("error data format.")
  826. }
  827. timestamp, err := strconv.ParseInt(data[0], 10, 64)
  828. if err != nil {
  829. return nil, err
  830. }
  831. tick.Timestamp = timestamp
  832. var ff [4]float64
  833. for i := 1; i < 5; i++ {
  834. f, err := strconv.ParseFloat(data[i], 64)
  835. if err != nil {
  836. return nil, err
  837. }
  838. ff[i-1] = f
  839. }
  840. tick.Price = ff[0] // bid price
  841. tick.Volume = ff[1] // bid qty
  842. tick.Bid[0] = ff[0]
  843. tick.Bid[1] = ff[1]
  844. tick.Ask[0] = ff[2]
  845. tick.Ask[1] = ff[3]
  846. return &tick, nil
  847. }
  848. // parseTick解析一行csv数据为一个candle
  849. func parseCandle(data []string) (*market.Candle, error) {
  850. if len(data) < 11 {
  851. return nil, errors.New("the csv field not match")
  852. }
  853. t, err := strconv.ParseInt(data[0], 10, 64)
  854. if err != nil {
  855. return nil, err
  856. }
  857. var ff [10]float64
  858. for i := 0; i < 10; i++ {
  859. ff[i], err = strconv.ParseFloat(data[i+1], 64)
  860. if err != nil {
  861. return nil, err
  862. }
  863. }
  864. return &market.Candle{
  865. Timestamp: t,
  866. Open: ff[0],
  867. High: ff[1],
  868. Low: ff[2],
  869. Close: ff[3],
  870. RealVolums: ff[4] + ff[5] + ff[6],
  871. TickVolums: ff[7] + ff[8] + ff[9],
  872. }, nil
  873. }
  874. func combinEx(filename string, candles []market.Candle) ([]market.Candle, error) {
  875. buf, err := market.ReadCandleFile(filename)
  876. if err != nil {
  877. return candles, err
  878. }
  879. candles = append(buf, candles[:]...)
  880. sort.Sort(byTime(candles))
  881. return candles, nil
  882. }
  883. // 周期为D1的只保存一个文件
  884. // 周期为H1的每个月保存一个文件
  885. // 其他周期(<H1),每天保存一个文件
  886. // func saveH1OrD1(db *market.MyDB, refer, filename string, candles []market.Candle, insId string, period int) error {
  887. // buf, _ := combin(filename, candles)
  888. // return market.SaveCandles(db, refer, filename, buf, insId, period)
  889. // }