package store import "io" import "bufio" import "os" import "io/ioutil" import "strings" import "encoding/binary" import "encoding/json" import "errors" import "time" import "fmt" import "log" import "path" import "database/sql" import "strconv" import "compress/gzip" //import "sync" const ( headerLen = 12 fileDuration = 3600 ) type DataLocation struct { start int32 end int32 count int32 datlocs map[int32]int32 } type LoggingSave interface { GetTime() int32 GetData() []byte LoadData([]byte) LoggingSave Size() int GetId() int64 } type byTimeLog []LoggingSave func (a byTimeLog) Len() int { return len(a) } func (a byTimeLog) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a byTimeLog) Less(i, j int) bool { return a[i].GetTime() < a[j].GetTime() } //按照时间顺序保存管理数据 type Save struct { lasttime int32 lastcount int32 readonly int32 //只读文件表示,这个文件已经写完成了。不能更新了 //posMap map[int32]map[int64]int64 //posMut sync.Mutex cur *os.File bufw *bufio.Writer path string basePath string typ string zipCh chan string db *sql.DB //bDownload bool //是否从外部下载历史数据 } //1. 文件每个小时一个 //2. 使用的时候保证只有一个线程在写 //3. 表示这个文件的meta信息: // 最后一个数据的时间(int32),是否只读(int32), 数据的数目(int32) 其他标记(int64) // 在创建下一个文件的时候,把上一个文件的这些信息补全。 func NewSaveWriter(dir, typ string, bDownload bool, empty LoggingSave, db *sql.DB) (*Save, error) { dataPath := fmt.Sprintf("%s/%s", dir, typ) _, err := os.Stat(dataPath) if err != nil { err := os.MkdirAll(dataPath, 0777) if err != nil { return nil, err } } basePath := dataPath //if !bDownload { dataPath = getCurPath(dataPath) //} infos, err := ioutil.ReadDir(dataPath) if err != nil { return nil, err } s := &Save{} s.zipCh = make(chan string, 1024) s.path = dataPath s.basePath = basePath s.typ = typ s.db = db //s.posMap = make(map[int32]map[int64]int64) //s.bDownload = bDownload var files []string for i := 0; i < len(infos); i++ { name := infos[i].Name() if strings.HasSuffix(name, ".bin") { files = append(files, dataPath+"/"+name) } } if len(files) > 0 { //最后一个文件可能会在程序崩溃的时候没有正确的处理 lastfile := files[len(files)-1] fp, err := os.Open(lastfile) if err != nil { return nil, err } info, err := fp.Stat() if err != nil { return nil, err } log.Println("info::", info.Size()) if info.Size() < headerLen { fp.Close() os.Remove(lastfile) return s, nil } err = binary.Read(fp, binary.LittleEndian, &s.lasttime) if err != nil { return nil, err } err = binary.Read(fp, binary.LittleEndian, &s.readonly) if err != nil { return nil, err } log.Println("read header") if s.readonly == 0 { //检查文件的完整性 log.Println("checking") onesize := empty.Size() truncate := info.Size() - (info.Size()-int64(headerLen))%int64(onesize) s.lastcount = int32((truncate - int64(headerLen)) / int64(onesize)) if info.Size() != truncate { fp.Close() fp, _ = os.OpenFile(lastfile, os.O_WRONLY, 0777) err := fp.Truncate(truncate) if err != nil { log.Println("truncate", err, truncate, info.Size(), s.lastcount) fp.Close() return nil, err } fp.Close() fp, _ = os.Open(lastfile) } log.Println("fixing") //修复最后一个数据的时间 if s.lastcount > 0 { fp.Seek(truncate-int64(empty.Size()), os.SEEK_SET) buf := make([]byte, empty.Size()) io.ReadFull(fp, buf) s.lasttime = empty.LoadData(buf).GetTime() } log.Println("open current") s.cur.Close() s.cur, err = os.OpenFile(lastfile, os.O_APPEND|os.O_WRONLY, 0777) s.bufw = bufio.NewWriter(s.cur) //s.indexingFile(lastfile, empty) } } go s.ZipAndSort(empty) return s, nil } func (s *Save) Save(data LoggingSave) error { t := data.GetTime() lastT := (s.lasttime / fileDuration) * fileDuration if lastT > t { //必须不能比base time 还要小,没有完全严格的顺序 //log.Println(lastT, t) return errors.New("ErrTime") } baseT := (t / fileDuration) * fileDuration if lastT == baseT { if s.bufw == nil { log.Println("no writer", s.typ) return errors.New("no writer") } n, err := s.bufw.Write(data.GetData()) if err != nil { log.Println("write data err", err) return err } //log.Println("write", n) if n != data.Size() { errinfo := fmt.Sprintf("1,n:%d, size:%d", n, data.Size()) return errors.New(errinfo) } s.lasttime = t s.lastcount++ /*id := data.GetId() s.posMut.Lock() _, ok := s.posMap[t] if !ok { s.posMap[t] = make(map[int64]int64) } s.posMap[t][id] = int64(headerLen + (s.lastcount-1)*int32(data.Size())) s.posMut.Unlock()*/ return nil } //not the same, 新建文件,原来的文件被冻结不再保存数据(一小部分数据可能丢失) if s.cur != nil { fname := s.cur.Name() s.Flush() s.cur.Close() s.cur, _ = os.OpenFile(fname, os.O_WRONLY, 0777) //_, err := s.cur.Seek(0, os.SEEK_SET) //if err != nil { //return err //} err := binary.Write(s.cur, binary.LittleEndian, s.lasttime) if err != nil { return err } s.readonly = 1 err = binary.Write(s.cur, binary.LittleEndian, s.readonly) if err != nil { return err } err = binary.Write(s.cur, binary.LittleEndian, s.lastcount) if err != nil { return err } s.cur.Close() s.zipCh <- fname } //文件名用base time 生成 baseT name := s.getFileName(int64(baseT)) s.cur, _ = os.Create(name) s.bufw = bufio.NewWriter(s.cur) s.lastcount = 0 s.lasttime = 0 s.readonly = 0 //s.files[baseT] = name //if s.bDownload { //服务器会从外部下载历史数据,所以本地只需保留一天的数据 //for len(s.files) > 24 { //os.Remove(s.files[0]) ////delete(s.filelocations, s.files[0]) //s.files = s.files[1:] //} //} err := binary.Write(s.cur, binary.LittleEndian, s.lasttime) if err != nil { return err } err = binary.Write(s.cur, binary.LittleEndian, s.readonly) if err != nil { return err } err = binary.Write(s.cur, binary.LittleEndian, s.lastcount) if err != nil { return err } n, err := s.bufw.Write(data.GetData()) if err != nil { return err } if n != data.Size() { errinfo := fmt.Sprintf("2,n:%d, size:%d", n, data.Size()) return errors.New(errinfo) } s.lasttime = t s.lastcount++ /*id := data.GetId() s.posMut.Lock() s.posMap = nil s.posMap = make(map[int32]map[int64]int64) s.posMap[t] = make(map[int64]int64) s.posMap[t][id] = headerLen s.posMut.Unlock()*/ return nil } func (s *Save) Close() { s.cur.Close() } func (s *Save) Flush() { s.bufw.Flush() s.cur.Sync() } func (s *Save) getFileName(baseT int64) string { t := time.Unix(baseT, 0) monthStr := fmt.Sprintf("%d", t.Month()) dir := path.Join(s.basePath, fmt.Sprint(t.Year()), monthStr, fmt.Sprint(t.Day())) if dir != s.path { os.MkdirAll(dir, 0777) s.path = dir } fname := s.path + "/" + time.Unix(baseT, 0).Format("2006-01-02-15-04-05") + ".bin" return fname } /*func (s *Save) indexingFile(fname string, empty LoggingSave) error { fp, err := os.Open(fname) if err != nil { return err } defer fp.Close() var lasttime, readonly, lastcount int32 err = binary.Read(fp, binary.LittleEndian, &lasttime) if err != nil { return err } err = binary.Read(fp, binary.LittleEndian, &readonly) if err != nil { return err } err = binary.Read(fp, binary.LittleEndian, &lastcount) if err != nil { return err } pos := headerLen buf := make([]byte, empty.Size()) for err == nil { _, err = io.ReadFull(fp, buf) if err != nil { if err == io.EOF { return nil } else { return err } } data := empty.LoadData(buf) time := data.GetTime() id := data.GetId() _, ok := s.posMap[time] if !ok { s.posMap[time] = make(map[int64]int64) } s.posMap[time][id] = int64(pos) pos += empty.Size() } return nil }*/ func (s *Save) saveDB(start, end, count int32, fname string) error { //要么成功,要么就是失败 tx, err := s.db.Begin() if err != nil { return err } q := fmt.Sprintf("insert into tick_index (begtime, endtime, path, ty, tickcount, totalcount) values ('%d', '%d', '%s', '%s', '%d', '%d')", start, end, fname, s.typ, count, 0) _, err = tx.Exec(q) //log.Println("debug saveDB", q) if err != nil { tx.Rollback() return err } return tx.Commit() } func zipAndSort(fname string, empty LoggingSave, start, end, count *int32) (string, error) { fp, err := os.Open(fname) if err != nil { return "", err } defer fp.Close() var lasttime, readonly, lastcount int32 err = binary.Read(fp, binary.LittleEndian, &lasttime) if err != nil { return "", err } err = binary.Read(fp, binary.LittleEndian, &readonly) if err != nil { return "", err } err = binary.Read(fp, binary.LittleEndian, &lastcount) if err != nil { return "", err } buf := make([]byte, empty.Size()) _, err = io.ReadFull(fp, buf) data := empty.LoadData(buf) *start = data.GetTime() *end = lasttime *count = lastcount fnameZip := strings.Replace(fname, ".bin", ".gz", 1) fpz, err := os.Create(fnameZip) if err != nil { return "", err } defer fpz.Close() gw := gzip.NewWriter(fpz) defer gw.Close() _, err = fp.Seek(0, os.SEEK_SET) _, err = io.Copy(gw, fp) if err != nil { if err != io.EOF { return "", err } } gw.Flush() fp.Close() os.Remove(fname) return fnameZip, nil } func (s *Save) ZipAndSort(empty LoggingSave) { for { fname := <-s.zipCh var start, end, count int32 fnameZip, err := zipAndSort(fname, empty, &start, &end, &count) if fnameZip == "" { log.Println("ZipAndSort", fname, err) } else { err := s.saveDB(start, end, count, fnameZip) if err != nil { log.Println("debug ZipAndSort", err) } } } } func (s *Save) PrintDataIndex() { //for fname, datloc := range s.filelocations { //log.Println(fname, datloc.count, datloc.start, datloc.end) //for k, v := range datloc.datlocs { //log.Println(k, v) //} //} //for k, v := range s.timefiles { //log.Println(k, v) //} //for i, v := range s.files { //log.Println(i, v) //} } func getBaseTime(fname string) int32 { _, name := path.Split(fname) shortname := strings.Split(name, ".")[0] shortname += "+08:00" t, _ := time.Parse("2006-01-02-15-04-05Z07:00", shortname) return int32(t.Unix()) } func (s *Save) GetData(start, end int64, offset, count int, empty LoggingSave) []LoggingSave { return nil } /*func (s *Save) GetData(start, end int64, offset, count int, empty LoggingSave) []LoggingSave { var datas []LoggingSave pos := int64(-1) if start == 0 { pos = headerLen } else { id := end s.posMut.Lock() poss, ok := s.posMap[int32(start)] if ok { pos = poss[id] } s.posMut.Unlock() } if pos == -1 { return nil } onesize := empty.Size() buf := make([]byte, onesize) if s.cur == nil { return nil } fp, err := os.Open(s.cur.Name()) if err != nil { log.Println("GetData.Open", err) return datas } defer fp.Close() _, err = fp.Seek(int64(pos), os.SEEK_SET) if err != nil { log.Println("GetData.Seek", err) return datas } if offset > 0 { pos, err := fp.Seek(0, os.SEEK_CUR) if err != nil { log.Println("GetData.Seek", err) return datas } fi, err := fp.Stat() if err != nil { log.Println("GetData.Stat", err) return datas } datnum := int((fi.Size() - pos)) / onesize if datnum > offset { _, err = fp.Seek(int64(offset*onesize), os.SEEK_CUR) if err != nil { log.Println("GetData.Seek", err) return datas } offset = 0 } else { return datas } } for err == nil { _, err = io.ReadFull(fp, buf) if err != nil { if err != io.EOF { log.Println("GetData.read.data", err) } return datas } data := empty.LoadData(buf) if count > 0 { datas = append(datas, data) count-- if count == 0 { return datas } } } return datas }*/ func ReadIndex(fname string) ([fileDuration]int64, error) { var indexs [fileDuration]int64 fnameIdx := strings.Replace(fname, ".bin", ".idx", 1) f, err := os.Open(fnameIdx) if err != nil { return indexs, err } defer f.Close() dec := json.NewDecoder(f) err = dec.Decode(&indexs) if err != nil { return indexs, err } return indexs, nil } func getCurPath(dataPath string) string { t := time.Now() monthStr := fmt.Sprintf("%d", t.Month()) nowpath := path.Join(dataPath, fmt.Sprint(t.Year()), monthStr, fmt.Sprint(t.Day())) infos, err := ioutil.ReadDir(dataPath) if err != nil { os.MkdirAll(nowpath, 0777) return nowpath } var yearpath, monthpath, daypath, emptystr string var iYear, iMonth, iDay int //tBase := time.Unix(0, 0) for _, v := range infos { if v.IsDir() { //&& (v.Name() > fmt.Sprint(tBase.Year()) && v.Name() <= fmt.Sprint(t.Year())) itmp, _ := strconv.Atoi(v.Name()) if itmp > iYear { yearpath = v.Name() iYear = itmp } } } if yearpath == emptystr { os.MkdirAll(nowpath, 0777) return nowpath } else { dataPath = path.Join(dataPath, yearpath) infos, err = ioutil.ReadDir(dataPath) if err != nil { os.MkdirAll(nowpath, 0777) return nowpath } for _, v := range infos { if v.IsDir() { //&& (v.Name() >= "1" && v.Name() <= "12") itmp, _ := strconv.Atoi(v.Name()) if itmp > iMonth { monthpath = v.Name() iMonth = itmp } } } if monthpath == emptystr { os.MkdirAll(nowpath, 0777) return nowpath } else { dataPath = path.Join(dataPath, monthpath) infos, err = ioutil.ReadDir(dataPath) if err != nil { os.MkdirAll(nowpath, 0777) return nowpath } for _, v := range infos { if v.IsDir() { // && (v.Name() >= "1" && v.Name() <= "31") itmp, _ := strconv.Atoi(v.Name()) if itmp > iDay { daypath = v.Name() iDay = itmp } } } if daypath == emptystr { os.MkdirAll(nowpath, 0777) return nowpath } else { dataPath = path.Join(dataPath, daypath) os.MkdirAll(dataPath, 0777) return dataPath } } } os.MkdirAll(nowpath, 0777) return nowpath }