snap.go 6.8 KB


  1. package tick
  2. //snap last hour ticks
  3. import "errors"
  4. import "encoding/json"
  5. import "encoding/gob"
  6. import "os"
  7. import "compress/gzip"
  8. import "database/sql"
  9. import "time"
  10. import "log"
  11. import "fmt"
  12. var ErrNotInSnapTime = errors.New("ErrNotInSnapTime")
  13. var ErrBegTime = errors.New("ErrBegTime")
  14. func getHourTime(t int64) int64 {
  15. return (t / (3600 * 1000)) * (3600 * 1000)
  16. }
  17. func SnapAll() error {
  18. for name, dsConf := range serverconf.DsMap {
  19. if dsConf.Run {
  20. for {
  21. err := SnapHour(name)
  22. if err != nil {
  23. log.Println(err)
  24. break
  25. }
  26. }
  27. }
  28. }
  29. return nil
  30. }
  31. //做快照
  32. func SnapHour(tyin string) error {
  33. //read first line
  34. q := "select `time`, `ty` from tick_log where ty = ? order by `time` limit 1"
  35. row := db.QueryRow(q, tyin)
  36. var t int64
  37. var ty string
  38. err := row.Scan(&t, &ty)
  39. if err != nil {
  40. log.Println("1")
  41. return err
  42. }
  43. begTime := getHourTime(t)
  44. if !inSanpTime(ty, begTime) {
  45. return ErrNotInSnapTime
  46. }
  47. //事务处理
  48. q = "select data from tick_log where ty = ? and `time` >= ? and `time` <= ? order by `time`"
  49. endTime := begTime + 3600 * 1000 - 1
  50. rows, err := db.Query(q, ty, begTime, endTime)
  51. if err != nil {
  52. return err
  53. }
  54. defer rows.Close()
  55. file, err := newTickFile(ty, begTime)
  56. if err != nil {
  57. return err
  58. }
  59. defer file.Close()
  60. var count int
  61. for rows.Next() {
  62. var o string
  63. count++
  64. if count % 10000 == 0 {
  65. log.Println("read tick ...", count)
  66. }
  67. if err := rows.Scan(&o); err != nil {
  68. log.Println("3")
  69. return err
  70. }
  71. var m Market
  72. err := json.Unmarshal([]byte(o), &m)
  73. if err != nil {
  74. return err
  75. }
  76. err = file.AddTick(&m)
  77. if err != nil {
  78. return err
  79. }
  80. }
  81. if err := rows.Err(); err != nil {
  82. return err
  83. }
  84. //index and clear, index and clear in transaction
  85. err = file.Index()
  86. if err != nil {
  87. return err
  88. }
  89. return nil
  90. }
  91. func inSanpTime(ty string, i int64) bool {
  92. //这个starttime 是两个小时前的数据
  93. i += 3600 * 1000 * 2
  94. q := "select id from tick_log where `time` > ? and ty = ? order by `time` limit 1"
  95. row := db.QueryRow(q, i, ty)
  96. var id int64
  97. err := row.Scan(&id)
  98. if err != nil {
  99. log.Println("4")
  100. return false
  101. }
  102. return id > 0
  103. }
  104. type tickFile struct {
  105. ty string
  106. begTime int64
  107. endTime int64
  108. count int64
  109. w *gzip.Writer
  110. file *os.File
  111. gobencoder *gob.Encoder
  112. path string
  113. }
  114. func newTickFile(ty string, begTime int64) (*tickFile, error) {
  115. o := &tickFile{}
  116. //begTime must big than max_endtime of the same type
  117. q := "select max(begtime) from tick_index where ty = ?"
  118. row := db.QueryRow(q, ty)
  119. var _max_endtime sql.NullInt64
  120. var max_endtime int64
  121. err := row.Scan(&_max_endtime)
  122. if err == sql.ErrNoRows {
  123. //do nothing
  124. } else if err != nil {
  125. return nil, err
  126. }
  127. if _max_endtime.Valid {
  128. max_endtime = _max_endtime.Int64
  129. }
  130. if max_endtime > 0 && begTime < max_endtime {
  131. log.Println("delete some log")
  132. q := fmt.Sprintf("delete from tick_log where ty = '%s' and `time` < %d", ty, max_endtime+3600*1000 - 1)
  133. log.Println(q, begTime, max_endtime)
  134. _, err := db.Exec(q)
  135. if err != nil {
  136. return nil, err
  137. }
  138. return nil, ErrBegTime
  139. }
  140. o.begTime = begTime
  141. o.ty = ty
  142. ticktime := time.Unix(begTime / 1000, 0).Format("2006_01_02_15")
  143. filename := ty + "_" + ticktime + ".gz"
  144. path, err := getFilePath(ty)
  145. if path == "" || err != nil {
  146. return nil, err
  147. }
  148. os.MkdirAll(path, 0777)
  149. fullpath := path + "/" + filename
  150. file, err := os.Create(fullpath)
  151. if err != nil {
  152. return nil, err
  153. }
  154. o.path = fullpath
  155. o.w = gzip.NewWriter(file)
  156. o.file = file
  157. o.gobencoder = gob.NewEncoder(o.w)
  158. return o, err
  159. }
  160. func (tf *tickFile) Close() error {
  161. tf.w.Close()
  162. return tf.file.Close()
  163. }
  164. func (tf *tickFile) AddTick(m *Market) error {
  165. //check range
  166. if m.Timestamp < tf.begTime || m.Timestamp >= tf.begTime + 3600 * 1000 {
  167. return ErrTimeRange
  168. }
  169. if tf.count == 0 {
  170. tf.begTime = m.Timestamp
  171. tf.endTime = m.Timestamp
  172. } else {
  173. //check time order
  174. if m.Timestamp < tf.endTime {
  175. return ErrTimeOrder
  176. }
  177. tf.endTime = m.Timestamp
  178. }
  179. tf.count++
  180. return tf.gobencoder.Encode(m)
  181. }
  182. func (tf *tickFile) Index() error {
  183. if tf.count == 0 {
  184. return ErrNoData
  185. }
  186. //要么成功,要么就是失败
  187. tx, err := db.Begin()
  188. if err != nil {
  189. return err
  190. }
  191. q := fmt.Sprintf("insert into tick_index (begtime, endtime, path, ty, tickcount) values ('%d', '%d', '%s', '%s', '%d')",
  192. tf.begTime, tf.endTime, tf.path, tf.ty, tf.count)
  193. _, err = tx.Exec(q)
  194. if err != nil {
  195. tx.Rollback()
  196. return err
  197. }
  198. err = tf.updateTotalCount(tx, tf.ty)
  199. if err != nil {
  200. tx.Rollback()
  201. return err
  202. }
  203. err = tf.deleteLog(tx, tf.ty, tf.begTime, tf.endTime)
  204. if err != nil {
  205. tx.Rollback()
  206. return err
  207. }
  208. return tx.Commit()
  209. }
  210. func (tf *tickFile) updateTotalCount(tx *sql.Tx, ty string) error {
  211. q := "select id,tickcount,totalcount from tick_index where ty = '"+ty+"' and totalcount > 0 order by id desc limit 1"
  212. row := tx.QueryRow(q)
  213. var id int64
  214. var tickcount int64
  215. var totalcount int64
  216. err := row.Scan(&id, &tickcount, &totalcount)
  217. if err == sql.ErrNoRows {
  218. totalcount = 0
  219. } else if err != nil {
  220. return err
  221. }
  222. log.Println("last total count", totalcount)
  223. //update total count
  224. q = "select id, tickcount, totalcount from tick_index where ty = '"+ty+"' and totalcount = 0 order by id"
  225. data, err := fetchAll(tx, q)
  226. if err != nil {
  227. return err
  228. }
  229. for i := 0; i < len(data); i++ {
  230. totalcount += data[i].tickcount
  231. q = fmt.Sprintf("update tick_index set totalcount = '%d' where id = '%d'", totalcount, data[i].id)
  232. _, err := tx.Exec(q)
  233. if err != nil {
  234. return err
  235. }
  236. }
  237. return nil
  238. }
  239. type tickIndex struct {
  240. id int64
  241. tickcount int64
  242. totalcount int64
  243. }
  244. func fetchAll(tx *sql.Tx, q string) ([]tickIndex, error) {
  245. rows, err := tx.Query(q)
  246. if err != nil {
  247. return nil, err
  248. }
  249. defer rows.Close()
  250. var ret []tickIndex
  251. for rows.Next() {
  252. var index tickIndex
  253. if err := rows.Scan(&index.id, &index.tickcount, &index.totalcount); err != nil {
  254. return nil, err
  255. }
  256. ret = append(ret, index)
  257. }
  258. if err := rows.Err(); err != nil {
  259. return nil, err
  260. }
  261. return ret, nil
  262. }
  263. func (tf *tickFile) deleteLog(tx *sql.Tx, ty string, begTime int64, endTime int64) error {
  264. _, err := tx.Exec("delete from tick_log where ty = ? and `time` >= ? and `time` <= ?", ty, begTime, endTime)
  265. return err
  266. }