history.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439
  1. // history.go
  2. package tick
  3. //import "log"
  4. import "sync"
  5. import "sort"
  6. import "time"
  7. import "tickserver/server/market"
  8. import "net/http"
  9. import "net/url"
  10. import "path"
  11. import "io"
  12. import "os"
  13. import "compress/gzip"
  14. import "encoding/binary"
  15. import "tickserver/framework/base"
  16. import "unsafe"
  17. import "strings"
  18. type ParseFileInfo struct {
  19. fname string
  20. begtime int64
  21. }
  22. type FileCandleMaker struct {
  23. candleGenerators []*base.Candle
  24. ohlcs []base.Ohlc
  25. candless [][]market.Candle
  26. dayLasts []int64
  27. }
  28. type CandleMaker struct {
  29. gds *GeneralDS
  30. typ string
  31. typId int
  32. dataDir string
  33. url string
  34. fileserver string
  35. db *market.MyDB
  36. client *http.Client
  37. hiss []TickIndex
  38. hmu sync.Mutex
  39. files []ParseFileInfo
  40. fmu sync.Mutex
  41. m2ch chan *Market2
  42. fileCandleMakersMap map[string]*FileCandleMaker
  43. tmpFileNameMap map[string]int
  44. }
  45. var hisTable = "history"
  46. var periodSet = []int{market.M1, market.M5, market.H1, market.D1}
  47. type byHisInfo []TickIndex
  48. func (a byHisInfo) Len() int { return len(a) }
  49. func (a byHisInfo) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  50. func (a byHisInfo) Less(i, j int) bool { return a[i].Begtime < a[j].Begtime }
  51. func (cm *CandleMaker) run() {
  52. lasttime, err := cm.db.GetHisLastTime(hisTable, cm.typ)
  53. if err != nil {
  54. //log.Println(err)
  55. return
  56. }
  57. cm.fileCandleMakersMap = make(map[string]*FileCandleMaker)
  58. cm.tmpFileNameMap = make(map[string]int)
  59. cm.m2ch = make(chan *Market2, 20480)
  60. go cm.makeCandle()
  61. go cm.parse()
  62. go cm.download()
  63. for {
  64. hiss, err := cm.getHisList(lasttime)
  65. if err != nil {
  66. //log.Println(err)
  67. }
  68. cm.hmu.Lock()
  69. cm.hiss = append(cm.hiss, hiss...)
  70. sort.Sort(byHisInfo(cm.hiss))
  71. cm.hmu.Unlock()
  72. hisnum := len(cm.hiss)
  73. if hisnum > 0 {
  74. lasttime = cm.hiss[hisnum-1].Begtime + 1
  75. }
  76. time.Sleep(time.Minute * 1)
  77. }
  78. }
  79. func (cm *CandleMaker) getHisList(lasttime int64) ([]TickIndex, error) {
  80. var hislist []TickIndex
  81. var offset int
  82. num := 1000
  83. for num >= 1000 {
  84. req := &DownloadRequest{Type: cm.typ, Start: lasttime, End: 0, Offset: offset, Count: 1000, OrderBy: "time asc"}
  85. //log.Println("history", req)
  86. body, err := httpReq(cm.client, "history", cm.url, req)
  87. if err != nil {
  88. //log.Println("httpReq", err)
  89. return hislist, err
  90. }
  91. var ticks []TickIndex
  92. _, err = decodeResp(body, &ticks)
  93. if err != nil {
  94. //log.Println("decodeResp", err)
  95. return hislist, err
  96. }
  97. //log.Println("history num:", len(ticks))
  98. hislist = append(hislist, ticks...)
  99. //log.Println(ticks)
  100. num = len(ticks)
  101. offset += num
  102. }
  103. return hislist, nil
  104. }
  105. func (cm *CandleMaker) download() {
  106. for {
  107. bHas := false
  108. var ti TickIndex
  109. cm.hmu.Lock()
  110. if len(cm.hiss) > 0 {
  111. ti = cm.hiss[0]
  112. bHas = true
  113. }
  114. cm.hmu.Unlock()
  115. if bHas {
  116. fname, err := cm.downloadOne(ti)
  117. if err == nil {
  118. cm.hmu.Lock()
  119. cm.hiss = cm.hiss[1:]
  120. cm.hmu.Unlock()
  121. //log.Println("history download:", fname)
  122. cm.fmu.Lock()
  123. cm.files = append(cm.files, ParseFileInfo{fname: fname, begtime: ti.Begtime})
  124. cm.fmu.Unlock()
  125. } else {
  126. //log.Println("history download:", ti, err)
  127. time.Sleep(1 * time.Minute)
  128. }
  129. } else {
  130. time.Sleep(1 * time.Second)
  131. }
  132. }
  133. }
  134. func (cm *CandleMaker) downloadOne(ti TickIndex) (string, error) {
  135. u, err := url.Parse(ti.Path)
  136. if err != nil {
  137. ti.Path = "http://" + cm.fileserver + ti.Path
  138. } else {
  139. if u.Scheme == "" {
  140. u.Scheme = "http"
  141. }
  142. if u.Host == "" {
  143. u.Host = cm.fileserver
  144. }
  145. ti.Path = u.String()
  146. }
  147. //u = ti.Path
  148. //u = strings.Replace(u, "fzm", "", 1)
  149. res, err := http.Get(ti.Path)
  150. if err != nil {
  151. return "", err
  152. }
  153. defer res.Body.Close()
  154. surl, err := url.Parse(ti.Path)
  155. if err != nil {
  156. return "", err
  157. }
  158. fname := path.Join(cm.dataDir, "tmp", surl.Path)
  159. dir := path.Dir(fname)
  160. os.MkdirAll(dir, 0777)
  161. w, err := os.Create(fname)
  162. if err != nil {
  163. return "", err
  164. }
  165. //log.Println(ti.Path)
  166. defer w.Close()
  167. _, err = io.Copy(w, res.Body)
  168. if err != nil {
  169. return "", err
  170. }
  171. return fname, nil
  172. }
  173. func (cm *CandleMaker) parse() {
  174. // 保存数据到文件
  175. ticker := time.Tick(time.Second * 30)
  176. for t := range ticker {
  177. if t.Hour() == 0 && t.Minute() == 30 { // 8:30点时保存
  178. if len(cm.files) > 0 {
  179. for len(cm.files) > 0 {
  180. bHas := false
  181. var pfi ParseFileInfo
  182. cm.fmu.Lock()
  183. if len(cm.files) > 0 {
  184. pfi = cm.files[0]
  185. bHas = true
  186. }
  187. cm.fmu.Unlock()
  188. if bHas {
  189. err := cm.ParseOne(pfi.fname)
  190. cm.fmu.Lock()
  191. cm.files = cm.files[1:]
  192. cm.fmu.Unlock()
  193. if err != nil {
  194. os.Rename(pfi.fname, pfi.fname+".bad")
  195. //log.Println("history parse:", len(cm.files), pfi.fname, err)
  196. } else {
  197. os.Remove(pfi.fname)
  198. //log.Println("history parse:", len(cm.files), pfi.fname)
  199. if len(cm.files) == 0 {
  200. err = cm.db.UpdateHisLastTime(hisTable, cm.typ, pfi.begtime+1) //tks[len(tks)-1].Timestamp
  201. if err != nil {
  202. //log.Println("cm.db.UpdateHisLastTime", err, cm.typ, pfi.begtime) //tks[len(tks)-1].Timestamp
  203. }
  204. }
  205. }
  206. }
  207. }
  208. //notify no more data
  209. cm.m2ch <- nil
  210. }
  211. }
  212. }
  213. }
  214. func (cm *CandleMaker) ParseOne(fname string) error {
  215. f, err := os.Open(fname)
  216. if err != nil {
  217. return err
  218. }
  219. defer f.Close()
  220. gr, err := gzip.NewReader(f)
  221. if err != nil {
  222. return err
  223. }
  224. defer gr.Close()
  225. var lasttime, readonly, lastcount int32
  226. err = binary.Read(gr, binary.LittleEndian, &lasttime)
  227. if err != nil {
  228. return err
  229. }
  230. err = binary.Read(gr, binary.LittleEndian, &readonly)
  231. if err != nil {
  232. return err
  233. }
  234. err = binary.Read(gr, binary.LittleEndian, &lastcount)
  235. if err != nil {
  236. return err
  237. }
  238. for {
  239. m := &Market2{}
  240. err := binary.Read(gr, binary.LittleEndian, m)
  241. if err != nil {
  242. if err != io.EOF {
  243. return err
  244. } else {
  245. return nil
  246. }
  247. }
  248. if m.Type != int64(cm.typId) {
  249. //log.Println("history wrongggggggggg typ", m.Type, cm.typId)
  250. continue
  251. }
  252. cm.m2ch <- m
  253. }
  254. return nil
  255. }
  256. func (cm *CandleMaker) makeCandle() {
  257. dir := path.Join(cm.dataDir, cm.typ)
  258. for {
  259. m := <-cm.m2ch
  260. if m == nil {
  261. for k, v := range cm.fileCandleMakersMap {
  262. for i, _ := range v.candless {
  263. if periodSet[i] != market.D1 {
  264. for n, tmpcandle := range v.candless[i] {
  265. day := tmpcandle.Timestamp / (1000 * 3600 * 24)
  266. if day != v.dayLasts[i] && v.dayLasts[i] != 0 {
  267. fname, err := market.SaveCandlesTmp(dir, k, v.candless[i][:n], periodSet[i], false)
  268. if err != nil {
  269. //log.Println(fname, err)
  270. } else {
  271. cm.tmpFileNameMap[fname] = 0
  272. //log.Println(fname)
  273. }
  274. v.candless[i] = v.candless[i][n:]
  275. }
  276. v.dayLasts[i] = day
  277. }
  278. }
  279. fname, err := market.SaveCandlesTmp(dir, k, v.candless[i], periodSet[i], false)
  280. if err != nil {
  281. //log.Println(fname, err)
  282. } else {
  283. if periodSet[i] != market.D1 {
  284. cm.tmpFileNameMap[fname] = 0
  285. }
  286. //log.Println(fname)
  287. }
  288. v.candless[i] = nil
  289. }
  290. }
  291. for k, _ := range cm.tmpFileNameMap {
  292. fname := strings.TrimSuffix(k, ".tmp")
  293. //log.Println(fname, k)
  294. if fname != k {
  295. if _, err := os.Stat(fname); os.IsNotExist(err) {
  296. if _, err := os.Stat(k); err == nil {
  297. for err = os.Rename(k, fname); err != nil; err = os.Rename(k, fname) {
  298. time.Sleep(time.Second)
  299. }
  300. }
  301. }
  302. }
  303. }
  304. cm.tmpFileNameMap = nil
  305. cm.tmpFileNameMap = make(map[string]int)
  306. continue
  307. }
  308. insIdStr := cm.gds.getInsIdStr(m.InsId)
  309. if insIdStr == "" {
  310. //log.Println("wrong insId:", cm.typ, m.InsId)
  311. continue
  312. }
  313. fcm, ok := cm.fileCandleMakersMap[insIdStr]
  314. if !ok {
  315. candleGenerators := make([]*base.Candle, len(periodSet))
  316. ohlcs := make([]base.Ohlc, len(periodSet))
  317. for i, period := range periodSet {
  318. candleGenerators[i], _ = base.NewCandle(period, 2, nil, 0)
  319. if strings.HasPrefix(insIdStr, Ctp) {
  320. candleGenerators[i].Set(base.CANDLE_AUTOCOMPLETE_MAX, 1)
  321. }
  322. ohlcs[i] = base.Ohlc{}
  323. }
  324. fcm = &FileCandleMaker{
  325. candleGenerators: candleGenerators[:],
  326. ohlcs: ohlcs[:],
  327. candless: make([][]market.Candle, len(periodSet)),
  328. dayLasts: make([]int64, len(periodSet)),
  329. }
  330. cm.fileCandleMakersMap[insIdStr] = fcm
  331. }
  332. for i, candleGenerator := range fcm.candleGenerators {
  333. tg := Mk2Tg(*m)
  334. num := candleGenerator.UpdateTick((*base.Tick)(unsafe.Pointer(&tg)))
  335. var tmpcandles []market.Candle
  336. if num == 0 {
  337. candleGenerator.Next(&fcm.ohlcs[i])
  338. ohlcGo := fcm.ohlcs[i].ToGOStruct()
  339. tmpcandles = append(tmpcandles, OhlcGo2Candle(ohlcGo))
  340. } else if num > 0 {
  341. for j := 0; j < num; j++ {
  342. candleGenerator.Next(&fcm.ohlcs[i])
  343. ohlcGo := fcm.ohlcs[i].ToGOStruct()
  344. tmpcandles = append(tmpcandles, OhlcGo2Candle(ohlcGo))
  345. }
  346. } else {
  347. //log.Println("tick error.")
  348. }
  349. for _, tmpcandle := range tmpcandles {
  350. if periodSet[i] != market.D1 {
  351. day := tmpcandle.Timestamp / (1000 * 3600 * 24)
  352. if day != fcm.dayLasts[i] && fcm.dayLasts[i] != 0 {
  353. fname, err := market.SaveCandlesTmp(dir, insIdStr, fcm.candless[i], periodSet[i], false)
  354. if err != nil {
  355. //log.Println(fname, err)
  356. } else {
  357. cm.tmpFileNameMap[fname] = 0
  358. //log.Println(fname)
  359. }
  360. fcm.candless[i] = nil
  361. }
  362. fcm.dayLasts[i] = day
  363. }
  364. if len(fcm.candless[i]) > 0 && fcm.candless[i][len(fcm.candless[i])-1].Timestamp == tmpcandle.Timestamp {
  365. fcm.candless[i][len(fcm.candless[i])-1] = tmpcandle
  366. } else {
  367. fcm.candless[i] = append(fcm.candless[i], tmpcandle)
  368. }
  369. }
  370. }
  371. }
  372. }
  373. func Mk2Tg(mk Market2) base.TickGo {
  374. var tg base.TickGo
  375. tg.Time = int32(mk.Timestamp / 1000)
  376. tg.Ms = int16(mk.Timestamp % 1000)
  377. tg.Symbol = 0
  378. tg.Bid = float32(mk.LastPrice) //tk.Bid[0]
  379. //tg.Ask = float32(tk.Price) //tk.Ask[0]
  380. tg.Bidv = float32(mk.LastVolume) //tk.Bid[1]
  381. //tg.Askv = int32(tk.Volume) //tk.Ask[1]
  382. return tg
  383. }
  384. func Mk2Tk(mk *Market2) market.Tick {
  385. var tk market.Tick
  386. tk.Timestamp = mk.Timestamp
  387. tk.Price = mk.LastPrice
  388. tk.Volume = mk.LastVolume
  389. tk.Ask[0] = mk.Asks[0][0]
  390. tk.Ask[1] = mk.Asks[0][1]
  391. tk.Bid[0] = mk.Bids[0][0]
  392. tk.Bid[1] = mk.Bids[0][1]
  393. return tk
  394. }
  395. func OhlcGo2Candle(ohlcGo base.OhlcGo) market.Candle {
  396. var c market.Candle
  397. c.Timestamp = int64(ohlcGo.Time) * 1000
  398. c.Open = ohlcGo.Open
  399. c.High = ohlcGo.High
  400. c.Low = ohlcGo.Low
  401. c.Close = ohlcGo.Close
  402. c.TickVolums = float64(ohlcGo.TickVolumn)
  403. c.RealVolums = float64(ohlcGo.RealVolumn)
  404. return c
  405. }