123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296 |
- package main
- import (
- "flag"
- "fmt"
- "hash/crc32"
- "io"
- "io/ioutil"
- "tickserver/api/lmaxapi"
- "tickserver/api/lmaxapi/main/common"
- "tickserver/api/lmaxapi/request"
- "tickserver/api/lmaxapi/response"
- "log"
- "net/url"
- "os"
- "path"
- "runtime"
- "time"
- "strconv"
- )
- var ty = flag.String("t", "tick", "tick or M1")
- var bookId = flag.String("bookid", "4001", "lmax book id")
- func CreateSession() (*lmaxapi.Session, error) {
- lmaxapi.SetBaseUrl("https://trade.lmaxtrader.com")
- req := request.NewLoginRequest("wave2907", "Tg417396", request.ProductType.CFD_LIVE)
- session, err := lmaxapi.Login(req)
- if err == nil {
- log.Println("My accountId is:", session.GetAccountDetails().AccountId)
- }
- return session, err
- }
- func main() {
- runtime.GOMAXPROCS(runtime.NumCPU())
- common.EnableLog("proto.log")
- flag.Parse()
- session, err := CreateSession()
- if err != nil {
- panic(err)
- }
- session.RegisterHistoricMarketDataEvent(HistoricEvent)
- req := request.NewHistoricSubscriptionRequest()
- session.Subscribe(req, func(err error) {
- if err == nil {
- var req1 lmaxapi.IRequest
- id, err := strconv.Atoi(*bookId)
- if err != nil {
- panic(err)
- }
- if *ty == "tick" {
- req1 = request.NewTopOfBookHistoricRequest(*bookId, int64(id),
- time.Date(2014, 1, 1, 0, 0, 0, 0, time.UTC),
- time.Now(),
- "CSV")
- } else {
- req1 = request.NewAggregateHistoricRequest(*bookId, int64(id),
- time.Date(2014, 1, 1, 0, 0, 0, 0, time.UTC),
- time.Now(),
- "CSV", //暂时只有一种格式
- "MINUTE", //还可以 DAY, 也就是只有分钟线 和 日线两种
- []string{"BID"}) //BID/ASK/TRADE 可以是这三种形式
- }
- session.RequestHistoricMarketData(req1, func(err error) {
- if err != nil {
- fmt.Println("Failed to request historic market data:", err)
- }
- })
- } else {
- fmt.Println("Failed to subscribe:", err)
- }
- })
- sub := request.NewOrderBookSubscriptionRequest(4001)
- session.Subscribe(sub, func(err error) {
- if err != nil {
- log.Println(err)
- }
- })
- common.Heartbeat(session)
- session.Start()
- }
- func HistoricEvent(s *lmaxapi.Session, event *response.HistoricMarketDataEvent) {
- t := time.Now()
- log.Println("@@@@@@@:len(event.Urls)=", len(event.Urls))
- downerr := downloadAndRetry(s, event.Urls, "./lmax", 2)
- fmt.Println(downerr)
- s.Stop()
- fmt.Println(time.Now().Sub(t))
- }
- //开始下载历史数据
- //1. 下载的策略。30个1组,下载完成一组,进入下一组。
- //2. 设置一下保存目录,让数据保存在目录里面。
- //3. 如果数据不为空,进行完整性校验。
- //4. 如果任务失败,那么重试一次。
- type downError struct {
- url string
- err error
- }
- type work struct {
- url string
- err error
- done bool
- }
- func download(session *lmaxapi.Session, urls []string, dirname string) []downError {
- task, err := newDownTask(session, 20, dirname)
- if err != nil {
- panic(err)
- }
- go func() {
- for i := 0; i < len(urls); i++ {
- task.send(urls[i])
- }
- }()
- var errurl []downError
- for i := 0; i < len(urls); i++ {
- url, err := task.getResponse()
- if err != nil {
- errurl = append(errurl, downError{url, err})
- continue
- }
- log.Println("done", url)
- }
- //结束任务
- task.done()
- return errurl
- }
- func downloadAndRetry(session *lmaxapi.Session, urls []string, dirname string, retry int) []downError {
- var downerr []downError
- for i := 0; i < retry; i++ {
- downerr = download(session, urls, dirname)
- if downerr == nil {
- return nil
- }
- urls := make([]string, len(downerr))
- for j := 0; j < len(downerr); j++ {
- urls[j] = downerr[j].url
- }
- }
- return downerr
- }
- type downTask struct {
- maxWork int
- dirname string
- in chan *work
- out chan *work
- session *lmaxapi.Session
- }
- func newDownTask(session *lmaxapi.Session, max int, dirname string) (*downTask, error) {
- task := &downTask{}
- task.maxWork = max
- task.dirname = dirname
- task.session = session
- err := os.MkdirAll(dirname, 0777)
- if err != nil {
- return nil, err
- }
- task.in = make(chan *work)
- task.out = make(chan *work)
- for i := 0; i < max; i++ {
- go task.dowork()
- }
- return task, nil
- }
- func (task *downTask) dowork() {
- for {
- w := <-task.in
- if w.done == true {
- break
- }
- url := w.url
- task.out <- task.downloadOne(url)
- }
- }
- func (task *downTask) getResponse() (string, error) {
- w := <-task.out
- return w.url, w.err
- }
- func (task *downTask) send(url string) {
- task.in <- &work{url, nil, false}
- }
- func (task *downTask) done() {
- for i := 0; i < task.maxWork; i++ {
- task.in <- &work{"", nil, true}
- }
- }
- func (task *downTask) downloadOne(u string) *work {
- surl, err := url.Parse(u)
- if err != nil {
- return &work{u, err, false}
- }
- urlpath := surl.Path
- filename := task.dirname + urlpath
- fullpath := path.Dir(filename)
- os.MkdirAll(fullpath, 0777)
- ok := checkFile(filename)
- if ok { //没有必要重新下载数据了
- return &work{u, nil, false}
- } else {
- os.Remove(filename)
- }
- resp, err := task.session.OpenUrl(u, nil)
- // fmt.Println(u)
- if err != nil {
- return &work{u, err, false}
- }
- defer resp.Body.Close()
- //下载数据保存到文件里面
- w, e := os.Create(filename)
- if e != nil {
- return &work{u, e, false}
- }
- _, e = io.Copy(w, resp.Body)
- w.Close()
- if e != nil { //发生错误了
- //删除文件
- os.Remove(filename)
- return &work{u, e, false}
- }
- e = hashfile(filename)
- return &work{u, nil, false}
- }
- func crc32file(filename string) (uint32, error) {
- crc := crc32.NewIEEE()
- r, err := os.Open(filename)
- if err != nil {
- return 0, err
- }
- defer r.Close()
- _, err = io.Copy(crc, r)
- if err != nil {
- return 0, err
- }
- return crc.Sum32(), nil
- }
- func hashfile(filename string) error {
- hashsave := filename + ".hash"
- file, err := os.Create(hashsave)
- if err != nil {
- return err
- }
- defer file.Close()
- crc, err := crc32file(filename)
- if err != nil {
- return err
- }
- _, err = file.WriteString(fmt.Sprint(crc))
- if err != nil {
- return err
- }
- return nil
- }
- func checkFile(local string) bool {
- file, err := os.Open(local)
- if err != nil {
- return false
- }
- defer file.Close()
- stat, err := file.Stat()
- if err != nil {
- return false
- }
- if stat.Size() == 0 {
- return false
- }
- //checksum
- crc := crc32.NewIEEE()
- _, err = io.Copy(crc, file)
- if err != nil {
- return false
- }
- sum := fmt.Sprint(crc.Sum32())
- //compare
- b, err := ioutil.ReadFile(local + ".hash")
- if sum != string(b) {
- return false
- }
- return true
- }
|