123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283 |
- package tick
- //snap last hour ticks
- import "errors"
- import "encoding/json"
- import "encoding/gob"
- import "os"
- import "compress/gzip"
- import "database/sql"
- import "time"
- import "log"
- import "fmt"
- var ErrNotInSnapTime = errors.New("ErrNotInSnapTime")
- var ErrBegTime = errors.New("ErrBegTime")
- func getHourTime(t int64) int64 {
- return (t / (3600 * 1000)) * (3600 * 1000)
- }
- func SnapAll() error {
- for name, dsConf := range serverconf.DsMap {
- if dsConf.Run {
- for {
- err := SnapHour(name)
- if err != nil {
- log.Println(err)
- break
- }
- }
- }
- }
- return nil
- }
- //做快照
- func SnapHour(tyin string) error {
- //read first line
- q := "select `time`, `ty` from tick_log where ty = ? order by `time` limit 1"
- row := db.QueryRow(q, tyin)
- var t int64
- var ty string
- err := row.Scan(&t, &ty)
- if err != nil {
- log.Println("1")
- return err
- }
- begTime := getHourTime(t)
- if !inSanpTime(ty, begTime) {
- return ErrNotInSnapTime
- }
- //事务处理
- q = "select data from tick_log where ty = ? and `time` >= ? and `time` <= ? order by `time`"
- endTime := begTime + 3600 * 1000 - 1
- rows, err := db.Query(q, ty, begTime, endTime)
- if err != nil {
- return err
- }
- defer rows.Close()
- file, err := newTickFile(ty, begTime)
- if err != nil {
- return err
- }
- defer file.Close()
- var count int
- for rows.Next() {
- var o string
- count++
- if count % 10000 == 0 {
- log.Println("read tick ...", count)
- }
- if err := rows.Scan(&o); err != nil {
- log.Println("3")
- return err
- }
- var m Market
- err := json.Unmarshal([]byte(o), &m)
- if err != nil {
- return err
- }
- err = file.AddTick(&m)
- if err != nil {
- return err
- }
- }
- if err := rows.Err(); err != nil {
- return err
- }
- //index and clear, index and clear in transaction
- err = file.Index()
- if err != nil {
- return err
- }
- return nil
- }
- func inSanpTime(ty string, i int64) bool {
- //这个starttime 是两个小时前的数据
- i += 3600 * 1000 * 2
- q := "select id from tick_log where `time` > ? and ty = ? order by `time` limit 1"
- row := db.QueryRow(q, i, ty)
- var id int64
- err := row.Scan(&id)
- if err != nil {
- log.Println("4")
- return false
- }
- return id > 0
- }
- type tickFile struct {
- ty string
- begTime int64
- endTime int64
- count int64
- w *gzip.Writer
- file *os.File
- gobencoder *gob.Encoder
- path string
- }
- func newTickFile(ty string, begTime int64) (*tickFile, error) {
- o := &tickFile{}
- //begTime must big than max_endtime of the same type
- q := "select max(begtime) from tick_index where ty = ?"
- row := db.QueryRow(q, ty)
- var _max_endtime sql.NullInt64
- var max_endtime int64
- err := row.Scan(&_max_endtime)
- if err == sql.ErrNoRows {
- //do nothing
- } else if err != nil {
- return nil, err
- }
- if _max_endtime.Valid {
- max_endtime = _max_endtime.Int64
- }
- if max_endtime > 0 && begTime < max_endtime {
- log.Println("delete some log")
- q := fmt.Sprintf("delete from tick_log where ty = '%s' and `time` < %d", ty, max_endtime+3600*1000 - 1)
- log.Println(q, begTime, max_endtime)
- _, err := db.Exec(q)
- if err != nil {
- return nil, err
- }
- return nil, ErrBegTime
- }
- o.begTime = begTime
- o.ty = ty
- ticktime := time.Unix(begTime / 1000, 0).Format("2006_01_02_15")
- filename := ty + "_" + ticktime + ".gz"
- path, err := getFilePath(ty)
- if path == "" || err != nil {
- return nil, err
- }
- os.MkdirAll(path, 0777)
- fullpath := path + "/" + filename
- file, err := os.Create(fullpath)
- if err != nil {
- return nil, err
- }
- o.path = fullpath
- o.w = gzip.NewWriter(file)
- o.file = file
- o.gobencoder = gob.NewEncoder(o.w)
- return o, err
- }
- func (tf *tickFile) Close() error {
- tf.w.Close()
- return tf.file.Close()
- }
- func (tf *tickFile) AddTick(m *Market) error {
- //check range
- if m.Timestamp < tf.begTime || m.Timestamp >= tf.begTime + 3600 * 1000 {
- return ErrTimeRange
- }
- if tf.count == 0 {
- tf.begTime = m.Timestamp
- tf.endTime = m.Timestamp
- } else {
- //check time order
- if m.Timestamp < tf.endTime {
- return ErrTimeOrder
- }
- tf.endTime = m.Timestamp
- }
- tf.count++
- return tf.gobencoder.Encode(m)
- }
- func (tf *tickFile) Index() error {
- if tf.count == 0 {
- return ErrNoData
- }
- //要么成功,要么就是失败
- tx, err := db.Begin()
- if err != nil {
- return err
- }
- q := fmt.Sprintf("insert into tick_index (begtime, endtime, path, ty, tickcount) values ('%d', '%d', '%s', '%s', '%d')",
- tf.begTime, tf.endTime, tf.path, tf.ty, tf.count)
- _, err = tx.Exec(q)
- if err != nil {
- tx.Rollback()
- return err
- }
- err = tf.updateTotalCount(tx, tf.ty)
- if err != nil {
- tx.Rollback()
- return err
- }
- err = tf.deleteLog(tx, tf.ty, tf.begTime, tf.endTime)
- if err != nil {
- tx.Rollback()
- return err
- }
- return tx.Commit()
- }
- func (tf *tickFile) updateTotalCount(tx *sql.Tx, ty string) error {
- q := "select id,tickcount,totalcount from tick_index where ty = '"+ty+"' and totalcount > 0 order by id desc limit 1"
- row := tx.QueryRow(q)
- var id int64
- var tickcount int64
- var totalcount int64
- err := row.Scan(&id, &tickcount, &totalcount)
- if err == sql.ErrNoRows {
- totalcount = 0
- } else if err != nil {
- return err
- }
- log.Println("last total count", totalcount)
- //update total count
- q = "select id, tickcount, totalcount from tick_index where ty = '"+ty+"' and totalcount = 0 order by id"
- data, err := fetchAll(tx, q)
- if err != nil {
- return err
- }
- for i := 0; i < len(data); i++ {
- totalcount += data[i].tickcount
- q = fmt.Sprintf("update tick_index set totalcount = '%d' where id = '%d'", totalcount, data[i].id)
- _, err := tx.Exec(q)
- if err != nil {
- return err
- }
- }
- return nil
- }
- type tickIndex struct {
- id int64
- tickcount int64
- totalcount int64
- }
- func fetchAll(tx *sql.Tx, q string) ([]tickIndex, error) {
- rows, err := tx.Query(q)
- if err != nil {
- return nil, err
- }
- defer rows.Close()
- var ret []tickIndex
- for rows.Next() {
- var index tickIndex
- if err := rows.Scan(&index.id, &index.tickcount, &index.totalcount); err != nil {
- return nil, err
- }
- ret = append(ret, index)
- }
- if err := rows.Err(); err != nil {
- return nil, err
- }
- return ret, nil
- }
- func (tf *tickFile) deleteLog(tx *sql.Tx, ty string, begTime int64, endTime int64) error {
- _, err := tx.Exec("delete from tick_log where ty = ? and `time` >= ? and `time` <= ?", ty, begTime, endTime)
- return err
- }
|