123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626 |
- package store
- import "io"
- import "bufio"
- import "os"
- import "io/ioutil"
- import "strings"
- import "encoding/binary"
- import "encoding/json"
- import "errors"
- import "time"
- import "fmt"
- import "log"
- import "path"
- import "database/sql"
- import "strconv"
- import "compress/gzip"
- //import "sync"
- const (
- headerLen = 12
- fileDuration = 3600
- )
- type DataLocation struct {
- start int32
- end int32
- count int32
- datlocs map[int32]int32
- }
- type LoggingSave interface {
- GetTime() int32
- GetData() []byte
- LoadData([]byte) LoggingSave
- Size() int
- GetId() int64
- }
- type byTimeLog []LoggingSave
- func (a byTimeLog) Len() int { return len(a) }
- func (a byTimeLog) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
- func (a byTimeLog) Less(i, j int) bool { return a[i].GetTime() < a[j].GetTime() }
- //按照时间顺序保存管理数据
- type Save struct {
- lasttime int32
- lastcount int32
- readonly int32 //只读文件表示,这个文件已经写完成了。不能更新了
- //posMap map[int32]map[int64]int64
- //posMut sync.Mutex
- cur *os.File
- bufw *bufio.Writer
- path string
- basePath string
- typ string
- zipCh chan string
- db *sql.DB
- //bDownload bool //是否从外部下载历史数据
- }
- //1. 文件每个小时一个
- //2. 使用的时候保证只有一个线程在写
- //3. 表示这个文件的meta信息:
- // 最后一个数据的时间(int32),是否只读(int32), 数据的数目(int32) 其他标记(int64)
- // 在创建下一个文件的时候,把上一个文件的这些信息补全。
- func NewSaveWriter(dir, typ string, bDownload bool, empty LoggingSave, db *sql.DB) (*Save, error) {
- dataPath := fmt.Sprintf("%s/%s", dir, typ)
- _, err := os.Stat(dataPath)
- if err != nil {
- err := os.MkdirAll(dataPath, 0777)
- if err != nil {
- return nil, err
- }
- }
- basePath := dataPath
- //if !bDownload {
- dataPath = getCurPath(dataPath)
- //}
- infos, err := ioutil.ReadDir(dataPath)
- if err != nil {
- return nil, err
- }
- s := &Save{}
- s.zipCh = make(chan string, 1024)
- s.path = dataPath
- s.basePath = basePath
- s.typ = typ
- s.db = db
- //s.posMap = make(map[int32]map[int64]int64)
- //s.bDownload = bDownload
- var files []string
- for i := 0; i < len(infos); i++ {
- name := infos[i].Name()
- if strings.HasSuffix(name, ".bin") {
- files = append(files, dataPath+"/"+name)
- }
- }
- if len(files) > 0 {
- //最后一个文件可能会在程序崩溃的时候没有正确的处理
- lastfile := files[len(files)-1]
- fp, err := os.Open(lastfile)
- if err != nil {
- return nil, err
- }
- info, err := fp.Stat()
- if err != nil {
- return nil, err
- }
- log.Println("info::", info.Size())
- if info.Size() < headerLen {
- fp.Close()
- os.Remove(lastfile)
- return s, nil
- }
- err = binary.Read(fp, binary.LittleEndian, &s.lasttime)
- if err != nil {
- return nil, err
- }
- err = binary.Read(fp, binary.LittleEndian, &s.readonly)
- if err != nil {
- return nil, err
- }
- log.Println("read header")
- if s.readonly == 0 {
- //检查文件的完整性
- log.Println("checking")
- onesize := empty.Size()
- truncate := info.Size() - (info.Size()-int64(headerLen))%int64(onesize)
- s.lastcount = int32((truncate - int64(headerLen)) / int64(onesize))
- if info.Size() != truncate {
- fp.Close()
- fp, _ = os.OpenFile(lastfile, os.O_WRONLY, 0777)
- err := fp.Truncate(truncate)
- if err != nil {
- log.Println("truncate", err, truncate, info.Size(), s.lastcount)
- fp.Close()
- return nil, err
- }
- fp.Close()
- fp, _ = os.Open(lastfile)
- }
- log.Println("fixing")
- //修复最后一个数据的时间
- if s.lastcount > 0 {
- fp.Seek(truncate-int64(empty.Size()), os.SEEK_SET)
- buf := make([]byte, empty.Size())
- io.ReadFull(fp, buf)
- s.lasttime = empty.LoadData(buf).GetTime()
- }
- log.Println("open current")
- s.cur.Close()
- s.cur, err = os.OpenFile(lastfile, os.O_APPEND|os.O_WRONLY, 0777)
- s.bufw = bufio.NewWriter(s.cur)
- //s.indexingFile(lastfile, empty)
- }
- }
- go s.ZipAndSort(empty)
- return s, nil
- }
- func (s *Save) Save(data LoggingSave) error {
- t := data.GetTime()
- lastT := (s.lasttime / fileDuration) * fileDuration
- if lastT > t { //必须不能比base time 还要小,没有完全严格的顺序
- //log.Println(lastT, t)
- return errors.New("ErrTime")
- }
- baseT := (t / fileDuration) * fileDuration
- if lastT == baseT {
- if s.bufw == nil {
- log.Println("no writer", s.typ)
- return errors.New("no writer")
- }
- n, err := s.bufw.Write(data.GetData())
- if err != nil {
- log.Println("write data err", err)
- return err
- }
- //log.Println("write", n)
- if n != data.Size() {
- errinfo := fmt.Sprintf("1,n:%d, size:%d", n, data.Size())
- return errors.New(errinfo)
- }
- s.lasttime = t
- s.lastcount++
- /*id := data.GetId()
- s.posMut.Lock()
- _, ok := s.posMap[t]
- if !ok {
- s.posMap[t] = make(map[int64]int64)
- }
- s.posMap[t][id] = int64(headerLen + (s.lastcount-1)*int32(data.Size()))
- s.posMut.Unlock()*/
- return nil
- }
- //not the same, 新建文件,原来的文件被冻结不再保存数据(一小部分数据可能丢失)
- if s.cur != nil {
- fname := s.cur.Name()
- s.Flush()
- s.cur.Close()
- s.cur, _ = os.OpenFile(fname, os.O_WRONLY, 0777)
- //_, err := s.cur.Seek(0, os.SEEK_SET)
- //if err != nil {
- //return err
- //}
- err := binary.Write(s.cur, binary.LittleEndian, s.lasttime)
- if err != nil {
- return err
- }
- s.readonly = 1
- err = binary.Write(s.cur, binary.LittleEndian, s.readonly)
- if err != nil {
- return err
- }
- err = binary.Write(s.cur, binary.LittleEndian, s.lastcount)
- if err != nil {
- return err
- }
- s.cur.Close()
- s.zipCh <- fname
- }
- //文件名用base time 生成 baseT
- name := s.getFileName(int64(baseT))
- s.cur, _ = os.Create(name)
- s.bufw = bufio.NewWriter(s.cur)
- s.lastcount = 0
- s.lasttime = 0
- s.readonly = 0
- //s.files[baseT] = name
- //if s.bDownload { //服务器会从外部下载历史数据,所以本地只需保留一天的数据
- //for len(s.files) > 24 {
- //os.Remove(s.files[0])
- ////delete(s.filelocations, s.files[0])
- //s.files = s.files[1:]
- //}
- //}
- err := binary.Write(s.cur, binary.LittleEndian, s.lasttime)
- if err != nil {
- return err
- }
- err = binary.Write(s.cur, binary.LittleEndian, s.readonly)
- if err != nil {
- return err
- }
- err = binary.Write(s.cur, binary.LittleEndian, s.lastcount)
- if err != nil {
- return err
- }
- n, err := s.bufw.Write(data.GetData())
- if err != nil {
- return err
- }
- if n != data.Size() {
- errinfo := fmt.Sprintf("2,n:%d, size:%d", n, data.Size())
- return errors.New(errinfo)
- }
- s.lasttime = t
- s.lastcount++
- /*id := data.GetId()
- s.posMut.Lock()
- s.posMap = nil
- s.posMap = make(map[int32]map[int64]int64)
- s.posMap[t] = make(map[int64]int64)
- s.posMap[t][id] = headerLen
- s.posMut.Unlock()*/
- return nil
- }
- func (s *Save) Close() {
- s.cur.Close()
- }
- func (s *Save) Flush() {
- s.bufw.Flush()
- s.cur.Sync()
- }
- func (s *Save) getFileName(baseT int64) string {
- t := time.Unix(baseT, 0)
- monthStr := fmt.Sprintf("%d", t.Month())
- dir := path.Join(s.basePath, fmt.Sprint(t.Year()), monthStr, fmt.Sprint(t.Day()))
- if dir != s.path {
- os.MkdirAll(dir, 0777)
- s.path = dir
- }
- fname := s.path + "/" + time.Unix(baseT, 0).Format("2006-01-02-15-04-05") + ".bin"
- return fname
- }
- /*func (s *Save) indexingFile(fname string, empty LoggingSave) error {
- fp, err := os.Open(fname)
- if err != nil {
- return err
- }
- defer fp.Close()
- var lasttime, readonly, lastcount int32
- err = binary.Read(fp, binary.LittleEndian, &lasttime)
- if err != nil {
- return err
- }
- err = binary.Read(fp, binary.LittleEndian, &readonly)
- if err != nil {
- return err
- }
- err = binary.Read(fp, binary.LittleEndian, &lastcount)
- if err != nil {
- return err
- }
- pos := headerLen
- buf := make([]byte, empty.Size())
- for err == nil {
- _, err = io.ReadFull(fp, buf)
- if err != nil {
- if err == io.EOF {
- return nil
- } else {
- return err
- }
- }
- data := empty.LoadData(buf)
- time := data.GetTime()
- id := data.GetId()
- _, ok := s.posMap[time]
- if !ok {
- s.posMap[time] = make(map[int64]int64)
- }
- s.posMap[time][id] = int64(pos)
- pos += empty.Size()
- }
- return nil
- }*/
- func (s *Save) saveDB(start, end, count int32, fname string) error {
- //要么成功,要么就是失败
- tx, err := s.db.Begin()
- if err != nil {
- return err
- }
- q := fmt.Sprintf("insert into tick_index (begtime, endtime, path, ty, tickcount, totalcount) values ('%d', '%d', '%s', '%s', '%d', '%d')",
- start, end, fname, s.typ, count, 0)
- _, err = tx.Exec(q)
- //log.Println("debug saveDB", q)
- if err != nil {
- tx.Rollback()
- return err
- }
- return tx.Commit()
- }
- func zipAndSort(fname string, empty LoggingSave, start, end, count *int32) (string, error) {
- fp, err := os.Open(fname)
- if err != nil {
- return "", err
- }
- defer fp.Close()
- var lasttime, readonly, lastcount int32
- err = binary.Read(fp, binary.LittleEndian, &lasttime)
- if err != nil {
- return "", err
- }
- err = binary.Read(fp, binary.LittleEndian, &readonly)
- if err != nil {
- return "", err
- }
- err = binary.Read(fp, binary.LittleEndian, &lastcount)
- if err != nil {
- return "", err
- }
- buf := make([]byte, empty.Size())
- _, err = io.ReadFull(fp, buf)
- data := empty.LoadData(buf)
- *start = data.GetTime()
- *end = lasttime
- *count = lastcount
- fnameZip := strings.Replace(fname, ".bin", ".gz", 1)
- fpz, err := os.Create(fnameZip)
- if err != nil {
- return "", err
- }
- defer fpz.Close()
- gw := gzip.NewWriter(fpz)
- defer gw.Close()
- _, err = fp.Seek(0, os.SEEK_SET)
- _, err = io.Copy(gw, fp)
- if err != nil {
- if err != io.EOF {
- return "", err
- }
- }
- gw.Flush()
- fp.Close()
- os.Remove(fname)
- return fnameZip, nil
- }
- func (s *Save) ZipAndSort(empty LoggingSave) {
- for {
- fname := <-s.zipCh
- var start, end, count int32
- fnameZip, err := zipAndSort(fname, empty, &start, &end, &count)
- if fnameZip == "" {
- log.Println("ZipAndSort", fname, err)
- } else {
- err := s.saveDB(start, end, count, fnameZip)
- if err != nil {
- log.Println("debug ZipAndSort", err)
- }
- }
- }
- }
- func (s *Save) PrintDataIndex() {
- //for fname, datloc := range s.filelocations {
- //log.Println(fname, datloc.count, datloc.start, datloc.end)
- //for k, v := range datloc.datlocs {
- //log.Println(k, v)
- //}
- //}
- //for k, v := range s.timefiles {
- //log.Println(k, v)
- //}
- //for i, v := range s.files {
- //log.Println(i, v)
- //}
- }
- func getBaseTime(fname string) int32 {
- _, name := path.Split(fname)
- shortname := strings.Split(name, ".")[0]
- shortname += "+08:00"
- t, _ := time.Parse("2006-01-02-15-04-05Z07:00", shortname)
- return int32(t.Unix())
- }
- func (s *Save) GetData(start, end int64, offset, count int, empty LoggingSave) []LoggingSave {
- return nil
- }
- /*func (s *Save) GetData(start, end int64, offset, count int, empty LoggingSave) []LoggingSave {
- var datas []LoggingSave
- pos := int64(-1)
- if start == 0 {
- pos = headerLen
- } else {
- id := end
- s.posMut.Lock()
- poss, ok := s.posMap[int32(start)]
- if ok {
- pos = poss[id]
- }
- s.posMut.Unlock()
- }
- if pos == -1 {
- return nil
- }
- onesize := empty.Size()
- buf := make([]byte, onesize)
- if s.cur == nil {
- return nil
- }
- fp, err := os.Open(s.cur.Name())
- if err != nil {
- log.Println("GetData.Open", err)
- return datas
- }
- defer fp.Close()
- _, err = fp.Seek(int64(pos), os.SEEK_SET)
- if err != nil {
- log.Println("GetData.Seek", err)
- return datas
- }
- if offset > 0 {
- pos, err := fp.Seek(0, os.SEEK_CUR)
- if err != nil {
- log.Println("GetData.Seek", err)
- return datas
- }
- fi, err := fp.Stat()
- if err != nil {
- log.Println("GetData.Stat", err)
- return datas
- }
- datnum := int((fi.Size() - pos)) / onesize
- if datnum > offset {
- _, err = fp.Seek(int64(offset*onesize), os.SEEK_CUR)
- if err != nil {
- log.Println("GetData.Seek", err)
- return datas
- }
- offset = 0
- } else {
- return datas
- }
- }
- for err == nil {
- _, err = io.ReadFull(fp, buf)
- if err != nil {
- if err != io.EOF {
- log.Println("GetData.read.data", err)
- }
- return datas
- }
- data := empty.LoadData(buf)
- if count > 0 {
- datas = append(datas, data)
- count--
- if count == 0 {
- return datas
- }
- }
- }
- return datas
- }*/
- func ReadIndex(fname string) ([fileDuration]int64, error) {
- var indexs [fileDuration]int64
- fnameIdx := strings.Replace(fname, ".bin", ".idx", 1)
- f, err := os.Open(fnameIdx)
- if err != nil {
- return indexs, err
- }
- defer f.Close()
- dec := json.NewDecoder(f)
- err = dec.Decode(&indexs)
- if err != nil {
- return indexs, err
- }
- return indexs, nil
- }
- func getCurPath(dataPath string) string {
- t := time.Now()
- monthStr := fmt.Sprintf("%d", t.Month())
- nowpath := path.Join(dataPath, fmt.Sprint(t.Year()), monthStr, fmt.Sprint(t.Day()))
- infos, err := ioutil.ReadDir(dataPath)
- if err != nil {
- os.MkdirAll(nowpath, 0777)
- return nowpath
- }
- var yearpath, monthpath, daypath, emptystr string
- var iYear, iMonth, iDay int
- //tBase := time.Unix(0, 0)
- for _, v := range infos {
- if v.IsDir() { //&& (v.Name() > fmt.Sprint(tBase.Year()) && v.Name() <= fmt.Sprint(t.Year()))
- itmp, _ := strconv.Atoi(v.Name())
- if itmp > iYear {
- yearpath = v.Name()
- iYear = itmp
- }
- }
- }
- if yearpath == emptystr {
- os.MkdirAll(nowpath, 0777)
- return nowpath
- } else {
- dataPath = path.Join(dataPath, yearpath)
- infos, err = ioutil.ReadDir(dataPath)
- if err != nil {
- os.MkdirAll(nowpath, 0777)
- return nowpath
- }
- for _, v := range infos {
- if v.IsDir() { //&& (v.Name() >= "1" && v.Name() <= "12")
- itmp, _ := strconv.Atoi(v.Name())
- if itmp > iMonth {
- monthpath = v.Name()
- iMonth = itmp
- }
- }
- }
- if monthpath == emptystr {
- os.MkdirAll(nowpath, 0777)
- return nowpath
- } else {
- dataPath = path.Join(dataPath, monthpath)
- infos, err = ioutil.ReadDir(dataPath)
- if err != nil {
- os.MkdirAll(nowpath, 0777)
- return nowpath
- }
- for _, v := range infos {
- if v.IsDir() { // && (v.Name() >= "1" && v.Name() <= "31")
- itmp, _ := strconv.Atoi(v.Name())
- if itmp > iDay {
- daypath = v.Name()
- iDay = itmp
- }
- }
- }
- if daypath == emptystr {
- os.MkdirAll(nowpath, 0777)
- return nowpath
- } else {
- dataPath = path.Join(dataPath, daypath)
- os.MkdirAll(dataPath, 0777)
- return dataPath
- }
- }
- }
- os.MkdirAll(nowpath, 0777)
- return nowpath
- }
|