package tick //snap last hour ticks import "errors" import "encoding/json" import "encoding/gob" import "os" import "compress/gzip" import "database/sql" import "time" import "log" import "fmt" var ErrNotInSnapTime = errors.New("ErrNotInSnapTime") var ErrBegTime = errors.New("ErrBegTime") func getHourTime(t int64) int64 { return (t / (3600 * 1000)) * (3600 * 1000) } func SnapAll() error { for name, dsConf := range serverconf.DsMap { if dsConf.Run { for { err := SnapHour(name) if err != nil { log.Println(err) break } } } } return nil } //做快照 func SnapHour(tyin string) error { //read first line q := "select `time`, `ty` from tick_log where ty = ? order by `time` limit 1" row := db.QueryRow(q, tyin) var t int64 var ty string err := row.Scan(&t, &ty) if err != nil { log.Println("1") return err } begTime := getHourTime(t) if !inSanpTime(ty, begTime) { return ErrNotInSnapTime } //事务处理 q = "select data from tick_log where ty = ? and `time` >= ? and `time` <= ? order by `time`" endTime := begTime + 3600 * 1000 - 1 rows, err := db.Query(q, ty, begTime, endTime) if err != nil { return err } defer rows.Close() file, err := newTickFile(ty, begTime) if err != nil { return err } defer file.Close() var count int for rows.Next() { var o string count++ if count % 10000 == 0 { log.Println("read tick ...", count) } if err := rows.Scan(&o); err != nil { log.Println("3") return err } var m Market err := json.Unmarshal([]byte(o), &m) if err != nil { return err } err = file.AddTick(&m) if err != nil { return err } } if err := rows.Err(); err != nil { return err } //index and clear, index and clear in transaction err = file.Index() if err != nil { return err } return nil } func inSanpTime(ty string, i int64) bool { //这个starttime 是两个小时前的数据 i += 3600 * 1000 * 2 q := "select id from tick_log where `time` > ? and ty = ? order by `time` limit 1" row := db.QueryRow(q, i, ty) var id int64 err := row.Scan(&id) if err != nil { log.Println("4") return false } return id > 0 } type tickFile struct { ty string begTime int64 endTime int64 count int64 w *gzip.Writer file *os.File gobencoder *gob.Encoder path string } func newTickFile(ty string, begTime int64) (*tickFile, error) { o := &tickFile{} //begTime must big than max_endtime of the same type q := "select max(begtime) from tick_index where ty = ?" row := db.QueryRow(q, ty) var _max_endtime sql.NullInt64 var max_endtime int64 err := row.Scan(&_max_endtime) if err == sql.ErrNoRows { //do nothing } else if err != nil { return nil, err } if _max_endtime.Valid { max_endtime = _max_endtime.Int64 } if max_endtime > 0 && begTime < max_endtime { log.Println("delete some log") q := fmt.Sprintf("delete from tick_log where ty = '%s' and `time` < %d", ty, max_endtime+3600*1000 - 1) log.Println(q, begTime, max_endtime) _, err := db.Exec(q) if err != nil { return nil, err } return nil, ErrBegTime } o.begTime = begTime o.ty = ty ticktime := time.Unix(begTime / 1000, 0).Format("2006_01_02_15") filename := ty + "_" + ticktime + ".gz" path, err := getFilePath(ty) if path == "" || err != nil { return nil, err } os.MkdirAll(path, 0777) fullpath := path + "/" + filename file, err := os.Create(fullpath) if err != nil { return nil, err } o.path = fullpath o.w = gzip.NewWriter(file) o.file = file o.gobencoder = gob.NewEncoder(o.w) return o, err } func (tf *tickFile) Close() error { tf.w.Close() return tf.file.Close() } func (tf *tickFile) AddTick(m *Market) error { //check range if m.Timestamp < tf.begTime || m.Timestamp >= tf.begTime + 3600 * 1000 { return ErrTimeRange } if tf.count == 0 { tf.begTime = m.Timestamp tf.endTime = m.Timestamp } else { //check time order if m.Timestamp < tf.endTime { return ErrTimeOrder } tf.endTime = m.Timestamp } tf.count++ return tf.gobencoder.Encode(m) } func (tf *tickFile) Index() error { if tf.count == 0 { return ErrNoData } //要么成功,要么就是失败 tx, err := db.Begin() if err != nil { return err } q := fmt.Sprintf("insert into tick_index (begtime, endtime, path, ty, tickcount) values ('%d', '%d', '%s', '%s', '%d')", tf.begTime, tf.endTime, tf.path, tf.ty, tf.count) _, err = tx.Exec(q) if err != nil { tx.Rollback() return err } err = tf.updateTotalCount(tx, tf.ty) if err != nil { tx.Rollback() return err } err = tf.deleteLog(tx, tf.ty, tf.begTime, tf.endTime) if err != nil { tx.Rollback() return err } return tx.Commit() } func (tf *tickFile) updateTotalCount(tx *sql.Tx, ty string) error { q := "select id,tickcount,totalcount from tick_index where ty = '"+ty+"' and totalcount > 0 order by id desc limit 1" row := tx.QueryRow(q) var id int64 var tickcount int64 var totalcount int64 err := row.Scan(&id, &tickcount, &totalcount) if err == sql.ErrNoRows { totalcount = 0 } else if err != nil { return err } log.Println("last total count", totalcount) //update total count q = "select id, tickcount, totalcount from tick_index where ty = '"+ty+"' and totalcount = 0 order by id" data, err := fetchAll(tx, q) if err != nil { return err } for i := 0; i < len(data); i++ { totalcount += data[i].tickcount q = fmt.Sprintf("update tick_index set totalcount = '%d' where id = '%d'", totalcount, data[i].id) _, err := tx.Exec(q) if err != nil { return err } } return nil } type tickIndex struct { id int64 tickcount int64 totalcount int64 } func fetchAll(tx *sql.Tx, q string) ([]tickIndex, error) { rows, err := tx.Query(q) if err != nil { return nil, err } defer rows.Close() var ret []tickIndex for rows.Next() { var index tickIndex if err := rows.Scan(&index.id, &index.tickcount, &index.totalcount); err != nil { return nil, err } ret = append(ret, index) } if err := rows.Err(); err != nil { return nil, err } return ret, nil } func (tf *tickFile) deleteLog(tx *sql.Tx, ty string, begTime int64, endTime int64) error { _, err := tx.Exec("delete from tick_log where ty = ? and `time` >= ? and `time` <= ?", ty, begTime, endTime) return err }