HistoricMarketDataRequest.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "hash/crc32"
  6. "io"
  7. "io/ioutil"
  8. "tickserver/api/lmaxapi"
  9. "tickserver/api/lmaxapi/main/common"
  10. "tickserver/api/lmaxapi/request"
  11. "tickserver/api/lmaxapi/response"
  12. "log"
  13. "net/url"
  14. "os"
  15. "path"
  16. "runtime"
  17. "time"
  18. "strconv"
  19. )
  20. var ty = flag.String("t", "tick", "tick or M1")
  21. var bookId = flag.String("bookid", "4001", "lmax book id")
  22. func CreateSession() (*lmaxapi.Session, error) {
  23. lmaxapi.SetBaseUrl("https://trade.lmaxtrader.com")
  24. req := request.NewLoginRequest("wave2907", "Tg417396", request.ProductType.CFD_LIVE)
  25. session, err := lmaxapi.Login(req)
  26. if err == nil {
  27. log.Println("My accountId is:", session.GetAccountDetails().AccountId)
  28. }
  29. return session, err
  30. }
  31. func main() {
  32. runtime.GOMAXPROCS(runtime.NumCPU())
  33. common.EnableLog("proto.log")
  34. flag.Parse()
  35. session, err := CreateSession()
  36. if err != nil {
  37. panic(err)
  38. }
  39. session.RegisterHistoricMarketDataEvent(HistoricEvent)
  40. req := request.NewHistoricSubscriptionRequest()
  41. session.Subscribe(req, func(err error) {
  42. if err == nil {
  43. var req1 lmaxapi.IRequest
  44. id, err := strconv.Atoi(*bookId)
  45. if err != nil {
  46. panic(err)
  47. }
  48. if *ty == "tick" {
  49. req1 = request.NewTopOfBookHistoricRequest(*bookId, int64(id),
  50. time.Date(2014, 1, 1, 0, 0, 0, 0, time.UTC),
  51. time.Now(),
  52. "CSV")
  53. } else {
  54. req1 = request.NewAggregateHistoricRequest(*bookId, int64(id),
  55. time.Date(2014, 1, 1, 0, 0, 0, 0, time.UTC),
  56. time.Now(),
  57. "CSV", //暂时只有一种格式
  58. "MINUTE", //还可以 DAY, 也就是只有分钟线 和 日线两种
  59. []string{"BID"}) //BID/ASK/TRADE 可以是这三种形式
  60. }
  61. session.RequestHistoricMarketData(req1, func(err error) {
  62. if err != nil {
  63. fmt.Println("Failed to request historic market data:", err)
  64. }
  65. })
  66. } else {
  67. fmt.Println("Failed to subscribe:", err)
  68. }
  69. })
  70. sub := request.NewOrderBookSubscriptionRequest(4001)
  71. session.Subscribe(sub, func(err error) {
  72. if err != nil {
  73. log.Println(err)
  74. }
  75. })
  76. common.Heartbeat(session)
  77. session.Start()
  78. }
  79. func HistoricEvent(s *lmaxapi.Session, event *response.HistoricMarketDataEvent) {
  80. t := time.Now()
  81. log.Println("@@@@@@@:len(event.Urls)=", len(event.Urls))
  82. downerr := downloadAndRetry(s, event.Urls, "./lmax", 2)
  83. fmt.Println(downerr)
  84. s.Stop()
  85. fmt.Println(time.Now().Sub(t))
  86. }
  87. //开始下载历史数据
  88. //1. 下载的策略。30个1组,下载完成一组,进入下一组。
  89. //2. 设置一下保存目录,让数据保存在目录里面。
  90. //3. 如果数据不为空,进行完整性校验。
  91. //4. 如果任务失败,那么重试一次。
  92. type downError struct {
  93. url string
  94. err error
  95. }
  96. type work struct {
  97. url string
  98. err error
  99. done bool
  100. }
  101. func download(session *lmaxapi.Session, urls []string, dirname string) []downError {
  102. task, err := newDownTask(session, 20, dirname)
  103. if err != nil {
  104. panic(err)
  105. }
  106. go func() {
  107. for i := 0; i < len(urls); i++ {
  108. task.send(urls[i])
  109. }
  110. }()
  111. var errurl []downError
  112. for i := 0; i < len(urls); i++ {
  113. url, err := task.getResponse()
  114. if err != nil {
  115. errurl = append(errurl, downError{url, err})
  116. continue
  117. }
  118. log.Println("done", url)
  119. }
  120. //结束任务
  121. task.done()
  122. return errurl
  123. }
  124. func downloadAndRetry(session *lmaxapi.Session, urls []string, dirname string, retry int) []downError {
  125. var downerr []downError
  126. for i := 0; i < retry; i++ {
  127. downerr = download(session, urls, dirname)
  128. if downerr == nil {
  129. return nil
  130. }
  131. urls := make([]string, len(downerr))
  132. for j := 0; j < len(downerr); j++ {
  133. urls[j] = downerr[j].url
  134. }
  135. }
  136. return downerr
  137. }
  138. type downTask struct {
  139. maxWork int
  140. dirname string
  141. in chan *work
  142. out chan *work
  143. session *lmaxapi.Session
  144. }
  145. func newDownTask(session *lmaxapi.Session, max int, dirname string) (*downTask, error) {
  146. task := &downTask{}
  147. task.maxWork = max
  148. task.dirname = dirname
  149. task.session = session
  150. err := os.MkdirAll(dirname, 0777)
  151. if err != nil {
  152. return nil, err
  153. }
  154. task.in = make(chan *work)
  155. task.out = make(chan *work)
  156. for i := 0; i < max; i++ {
  157. go task.dowork()
  158. }
  159. return task, nil
  160. }
  161. func (task *downTask) dowork() {
  162. for {
  163. w := <-task.in
  164. if w.done == true {
  165. break
  166. }
  167. url := w.url
  168. task.out <- task.downloadOne(url)
  169. }
  170. }
  171. func (task *downTask) getResponse() (string, error) {
  172. w := <-task.out
  173. return w.url, w.err
  174. }
  175. func (task *downTask) send(url string) {
  176. task.in <- &work{url, nil, false}
  177. }
  178. func (task *downTask) done() {
  179. for i := 0; i < task.maxWork; i++ {
  180. task.in <- &work{"", nil, true}
  181. }
  182. }
  183. func (task *downTask) downloadOne(u string) *work {
  184. surl, err := url.Parse(u)
  185. if err != nil {
  186. return &work{u, err, false}
  187. }
  188. urlpath := surl.Path
  189. filename := task.dirname + urlpath
  190. fullpath := path.Dir(filename)
  191. os.MkdirAll(fullpath, 0777)
  192. ok := checkFile(filename)
  193. if ok { //没有必要重新下载数据了
  194. return &work{u, nil, false}
  195. } else {
  196. os.Remove(filename)
  197. }
  198. resp, err := task.session.OpenUrl(u, nil)
  199. // fmt.Println(u)
  200. if err != nil {
  201. return &work{u, err, false}
  202. }
  203. defer resp.Body.Close()
  204. //下载数据保存到文件里面
  205. w, e := os.Create(filename)
  206. if e != nil {
  207. return &work{u, e, false}
  208. }
  209. _, e = io.Copy(w, resp.Body)
  210. w.Close()
  211. if e != nil { //发生错误了
  212. //删除文件
  213. os.Remove(filename)
  214. return &work{u, e, false}
  215. }
  216. e = hashfile(filename)
  217. return &work{u, nil, false}
  218. }
  219. func crc32file(filename string) (uint32, error) {
  220. crc := crc32.NewIEEE()
  221. r, err := os.Open(filename)
  222. if err != nil {
  223. return 0, err
  224. }
  225. defer r.Close()
  226. _, err = io.Copy(crc, r)
  227. if err != nil {
  228. return 0, err
  229. }
  230. return crc.Sum32(), nil
  231. }
  232. func hashfile(filename string) error {
  233. hashsave := filename + ".hash"
  234. file, err := os.Create(hashsave)
  235. if err != nil {
  236. return err
  237. }
  238. defer file.Close()
  239. crc, err := crc32file(filename)
  240. if err != nil {
  241. return err
  242. }
  243. _, err = file.WriteString(fmt.Sprint(crc))
  244. if err != nil {
  245. return err
  246. }
  247. return nil
  248. }
  249. func checkFile(local string) bool {
  250. file, err := os.Open(local)
  251. if err != nil {
  252. return false
  253. }
  254. defer file.Close()
  255. stat, err := file.Stat()
  256. if err != nil {
  257. return false
  258. }
  259. if stat.Size() == 0 {
  260. return false
  261. }
  262. //checksum
  263. crc := crc32.NewIEEE()
  264. _, err = io.Copy(crc, file)
  265. if err != nil {
  266. return false
  267. }
  268. sum := fmt.Sprint(crc.Sum32())
  269. //compare
  270. b, err := ioutil.ReadFile(local + ".hash")
  271. if sum != string(b) {
  272. return false
  273. }
  274. return true
  275. }