package main import ( "flag" "fmt" "hash/crc32" "io" "io/ioutil" "tickserver/api/lmaxapi" "tickserver/api/lmaxapi/main/common" "tickserver/api/lmaxapi/request" "tickserver/api/lmaxapi/response" "log" "net/url" "os" "path" "runtime" "time" "strconv" ) var ty = flag.String("t", "tick", "tick or M1") var bookId = flag.String("bookid", "4001", "lmax book id") func CreateSession() (*lmaxapi.Session, error) { lmaxapi.SetBaseUrl("https://trade.lmaxtrader.com") req := request.NewLoginRequest("wave2907", "Tg417396", request.ProductType.CFD_LIVE) session, err := lmaxapi.Login(req) if err == nil { log.Println("My accountId is:", session.GetAccountDetails().AccountId) } return session, err } func main() { runtime.GOMAXPROCS(runtime.NumCPU()) common.EnableLog("proto.log") flag.Parse() session, err := CreateSession() if err != nil { panic(err) } session.RegisterHistoricMarketDataEvent(HistoricEvent) req := request.NewHistoricSubscriptionRequest() session.Subscribe(req, func(err error) { if err == nil { var req1 lmaxapi.IRequest id, err := strconv.Atoi(*bookId) if err != nil { panic(err) } if *ty == "tick" { req1 = request.NewTopOfBookHistoricRequest(*bookId, int64(id), time.Date(2014, 1, 1, 0, 0, 0, 0, time.UTC), time.Now(), "CSV") } else { req1 = request.NewAggregateHistoricRequest(*bookId, int64(id), time.Date(2014, 1, 1, 0, 0, 0, 0, time.UTC), time.Now(), "CSV", //暂时只有一种格式 "MINUTE", //还可以 DAY, 也就是只有分钟线 和 日线两种 []string{"BID"}) //BID/ASK/TRADE 可以是这三种形式 } session.RequestHistoricMarketData(req1, func(err error) { if err != nil { fmt.Println("Failed to request historic market data:", err) } }) } else { fmt.Println("Failed to subscribe:", err) } }) sub := request.NewOrderBookSubscriptionRequest(4001) session.Subscribe(sub, func(err error) { if err != nil { log.Println(err) } }) common.Heartbeat(session) session.Start() } func HistoricEvent(s *lmaxapi.Session, event *response.HistoricMarketDataEvent) { t := time.Now() log.Println("@@@@@@@:len(event.Urls)=", len(event.Urls)) downerr := downloadAndRetry(s, event.Urls, "./lmax", 2) fmt.Println(downerr) s.Stop() fmt.Println(time.Now().Sub(t)) } //开始下载历史数据 //1. 下载的策略。30个1组,下载完成一组,进入下一组。 //2. 设置一下保存目录,让数据保存在目录里面。 //3. 如果数据不为空,进行完整性校验。 //4. 如果任务失败,那么重试一次。 type downError struct { url string err error } type work struct { url string err error done bool } func download(session *lmaxapi.Session, urls []string, dirname string) []downError { task, err := newDownTask(session, 20, dirname) if err != nil { panic(err) } go func() { for i := 0; i < len(urls); i++ { task.send(urls[i]) } }() var errurl []downError for i := 0; i < len(urls); i++ { url, err := task.getResponse() if err != nil { errurl = append(errurl, downError{url, err}) continue } log.Println("done", url) } //结束任务 task.done() return errurl } func downloadAndRetry(session *lmaxapi.Session, urls []string, dirname string, retry int) []downError { var downerr []downError for i := 0; i < retry; i++ { downerr = download(session, urls, dirname) if downerr == nil { return nil } urls := make([]string, len(downerr)) for j := 0; j < len(downerr); j++ { urls[j] = downerr[j].url } } return downerr } type downTask struct { maxWork int dirname string in chan *work out chan *work session *lmaxapi.Session } func newDownTask(session *lmaxapi.Session, max int, dirname string) (*downTask, error) { task := &downTask{} task.maxWork = max task.dirname = dirname task.session = session err := os.MkdirAll(dirname, 0777) if err != nil { return nil, err } task.in = make(chan *work) task.out = make(chan *work) for i := 0; i < max; i++ { go task.dowork() } return task, nil } func (task *downTask) dowork() { for { w := <-task.in if w.done == true { break } url := w.url task.out <- task.downloadOne(url) } } func (task *downTask) getResponse() (string, error) { w := <-task.out return w.url, w.err } func (task *downTask) send(url string) { task.in <- &work{url, nil, false} } func (task *downTask) done() { for i := 0; i < task.maxWork; i++ { task.in <- &work{"", nil, true} } } func (task *downTask) downloadOne(u string) *work { surl, err := url.Parse(u) if err != nil { return &work{u, err, false} } urlpath := surl.Path filename := task.dirname + urlpath fullpath := path.Dir(filename) os.MkdirAll(fullpath, 0777) ok := checkFile(filename) if ok { //没有必要重新下载数据了 return &work{u, nil, false} } else { os.Remove(filename) } resp, err := task.session.OpenUrl(u, nil) // fmt.Println(u) if err != nil { return &work{u, err, false} } defer resp.Body.Close() //下载数据保存到文件里面 w, e := os.Create(filename) if e != nil { return &work{u, e, false} } _, e = io.Copy(w, resp.Body) w.Close() if e != nil { //发生错误了 //删除文件 os.Remove(filename) return &work{u, e, false} } e = hashfile(filename) return &work{u, nil, false} } func crc32file(filename string) (uint32, error) { crc := crc32.NewIEEE() r, err := os.Open(filename) if err != nil { return 0, err } defer r.Close() _, err = io.Copy(crc, r) if err != nil { return 0, err } return crc.Sum32(), nil } func hashfile(filename string) error { hashsave := filename + ".hash" file, err := os.Create(hashsave) if err != nil { return err } defer file.Close() crc, err := crc32file(filename) if err != nil { return err } _, err = file.WriteString(fmt.Sprint(crc)) if err != nil { return err } return nil } func checkFile(local string) bool { file, err := os.Open(local) if err != nil { return false } defer file.Close() stat, err := file.Stat() if err != nil { return false } if stat.Size() == 0 { return false } //checksum crc := crc32.NewIEEE() _, err = io.Copy(crc, file) if err != nil { return false } sum := fmt.Sprint(crc.Sum32()) //compare b, err := ioutil.ReadFile(local + ".hash") if sum != string(b) { return false } return true }