log_store.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626
  1. package store
  2. import "io"
  3. import "bufio"
  4. import "os"
  5. import "io/ioutil"
  6. import "strings"
  7. import "encoding/binary"
  8. import "encoding/json"
  9. import "errors"
  10. import "time"
  11. import "fmt"
  12. import "log"
  13. import "path"
  14. import "database/sql"
  15. import "strconv"
  16. import "compress/gzip"
  17. //import "sync"
  18. const (
  19. headerLen = 12
  20. fileDuration = 3600
  21. )
  22. type DataLocation struct {
  23. start int32
  24. end int32
  25. count int32
  26. datlocs map[int32]int32
  27. }
  28. type LoggingSave interface {
  29. GetTime() int32
  30. GetData() []byte
  31. LoadData([]byte) LoggingSave
  32. Size() int
  33. GetId() int64
  34. }
  35. type byTimeLog []LoggingSave
  36. func (a byTimeLog) Len() int { return len(a) }
  37. func (a byTimeLog) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  38. func (a byTimeLog) Less(i, j int) bool { return a[i].GetTime() < a[j].GetTime() }
  39. //按照时间顺序保存管理数据
  40. type Save struct {
  41. lasttime int32
  42. lastcount int32
  43. readonly int32 //只读文件表示,这个文件已经写完成了。不能更新了
  44. //posMap map[int32]map[int64]int64
  45. //posMut sync.Mutex
  46. cur *os.File
  47. bufw *bufio.Writer
  48. path string
  49. basePath string
  50. typ string
  51. zipCh chan string
  52. db *sql.DB
  53. //bDownload bool //是否从外部下载历史数据
  54. }
  55. //1. 文件每个小时一个
  56. //2. 使用的时候保证只有一个线程在写
  57. //3. 表示这个文件的meta信息:
  58. // 最后一个数据的时间(int32),是否只读(int32), 数据的数目(int32) 其他标记(int64)
  59. // 在创建下一个文件的时候,把上一个文件的这些信息补全。
  60. func NewSaveWriter(dir, typ string, bDownload bool, empty LoggingSave, db *sql.DB) (*Save, error) {
  61. dataPath := fmt.Sprintf("%s/%s", dir, typ)
  62. _, err := os.Stat(dataPath)
  63. if err != nil {
  64. err := os.MkdirAll(dataPath, 0777)
  65. if err != nil {
  66. return nil, err
  67. }
  68. }
  69. basePath := dataPath
  70. //if !bDownload {
  71. dataPath = getCurPath(dataPath)
  72. //}
  73. infos, err := ioutil.ReadDir(dataPath)
  74. if err != nil {
  75. return nil, err
  76. }
  77. s := &Save{}
  78. s.zipCh = make(chan string, 1024)
  79. s.path = dataPath
  80. s.basePath = basePath
  81. s.typ = typ
  82. s.db = db
  83. //s.posMap = make(map[int32]map[int64]int64)
  84. //s.bDownload = bDownload
  85. var files []string
  86. for i := 0; i < len(infos); i++ {
  87. name := infos[i].Name()
  88. if strings.HasSuffix(name, ".bin") {
  89. files = append(files, dataPath+"/"+name)
  90. }
  91. }
  92. if len(files) > 0 {
  93. //最后一个文件可能会在程序崩溃的时候没有正确的处理
  94. lastfile := files[len(files)-1]
  95. fp, err := os.Open(lastfile)
  96. if err != nil {
  97. return nil, err
  98. }
  99. info, err := fp.Stat()
  100. if err != nil {
  101. return nil, err
  102. }
  103. log.Println("info::", info.Size())
  104. if info.Size() < headerLen {
  105. fp.Close()
  106. os.Remove(lastfile)
  107. return s, nil
  108. }
  109. err = binary.Read(fp, binary.LittleEndian, &s.lasttime)
  110. if err != nil {
  111. return nil, err
  112. }
  113. err = binary.Read(fp, binary.LittleEndian, &s.readonly)
  114. if err != nil {
  115. return nil, err
  116. }
  117. log.Println("read header")
  118. if s.readonly == 0 {
  119. //检查文件的完整性
  120. log.Println("checking")
  121. onesize := empty.Size()
  122. truncate := info.Size() - (info.Size()-int64(headerLen))%int64(onesize)
  123. s.lastcount = int32((truncate - int64(headerLen)) / int64(onesize))
  124. if info.Size() != truncate {
  125. fp.Close()
  126. fp, _ = os.OpenFile(lastfile, os.O_WRONLY, 0777)
  127. err := fp.Truncate(truncate)
  128. if err != nil {
  129. log.Println("truncate", err, truncate, info.Size(), s.lastcount)
  130. fp.Close()
  131. return nil, err
  132. }
  133. fp.Close()
  134. fp, _ = os.Open(lastfile)
  135. }
  136. log.Println("fixing")
  137. //修复最后一个数据的时间
  138. if s.lastcount > 0 {
  139. fp.Seek(truncate-int64(empty.Size()), os.SEEK_SET)
  140. buf := make([]byte, empty.Size())
  141. io.ReadFull(fp, buf)
  142. s.lasttime = empty.LoadData(buf).GetTime()
  143. }
  144. log.Println("open current")
  145. s.cur.Close()
  146. s.cur, err = os.OpenFile(lastfile, os.O_APPEND|os.O_WRONLY, 0777)
  147. s.bufw = bufio.NewWriter(s.cur)
  148. //s.indexingFile(lastfile, empty)
  149. }
  150. }
  151. go s.ZipAndSort(empty)
  152. return s, nil
  153. }
  154. func (s *Save) Save(data LoggingSave) error {
  155. t := data.GetTime()
  156. lastT := (s.lasttime / fileDuration) * fileDuration
  157. if lastT > t { //必须不能比base time 还要小,没有完全严格的顺序
  158. //log.Println(lastT, t)
  159. return errors.New("ErrTime")
  160. }
  161. baseT := (t / fileDuration) * fileDuration
  162. if lastT == baseT {
  163. if s.bufw == nil {
  164. log.Println("no writer", s.typ)
  165. return errors.New("no writer")
  166. }
  167. n, err := s.bufw.Write(data.GetData())
  168. if err != nil {
  169. log.Println("write data err", err)
  170. return err
  171. }
  172. //log.Println("write", n)
  173. if n != data.Size() {
  174. errinfo := fmt.Sprintf("1,n:%d, size:%d", n, data.Size())
  175. return errors.New(errinfo)
  176. }
  177. s.lasttime = t
  178. s.lastcount++
  179. /*id := data.GetId()
  180. s.posMut.Lock()
  181. _, ok := s.posMap[t]
  182. if !ok {
  183. s.posMap[t] = make(map[int64]int64)
  184. }
  185. s.posMap[t][id] = int64(headerLen + (s.lastcount-1)*int32(data.Size()))
  186. s.posMut.Unlock()*/
  187. return nil
  188. }
  189. //not the same, 新建文件,原来的文件被冻结不再保存数据(一小部分数据可能丢失)
  190. if s.cur != nil {
  191. fname := s.cur.Name()
  192. s.Flush()
  193. s.cur.Close()
  194. s.cur, _ = os.OpenFile(fname, os.O_WRONLY, 0777)
  195. //_, err := s.cur.Seek(0, os.SEEK_SET)
  196. //if err != nil {
  197. //return err
  198. //}
  199. err := binary.Write(s.cur, binary.LittleEndian, s.lasttime)
  200. if err != nil {
  201. return err
  202. }
  203. s.readonly = 1
  204. err = binary.Write(s.cur, binary.LittleEndian, s.readonly)
  205. if err != nil {
  206. return err
  207. }
  208. err = binary.Write(s.cur, binary.LittleEndian, s.lastcount)
  209. if err != nil {
  210. return err
  211. }
  212. s.cur.Close()
  213. s.zipCh <- fname
  214. }
  215. //文件名用base time 生成 baseT
  216. name := s.getFileName(int64(baseT))
  217. s.cur, _ = os.Create(name)
  218. s.bufw = bufio.NewWriter(s.cur)
  219. s.lastcount = 0
  220. s.lasttime = 0
  221. s.readonly = 0
  222. //s.files[baseT] = name
  223. //if s.bDownload { //服务器会从外部下载历史数据,所以本地只需保留一天的数据
  224. //for len(s.files) > 24 {
  225. //os.Remove(s.files[0])
  226. ////delete(s.filelocations, s.files[0])
  227. //s.files = s.files[1:]
  228. //}
  229. //}
  230. err := binary.Write(s.cur, binary.LittleEndian, s.lasttime)
  231. if err != nil {
  232. return err
  233. }
  234. err = binary.Write(s.cur, binary.LittleEndian, s.readonly)
  235. if err != nil {
  236. return err
  237. }
  238. err = binary.Write(s.cur, binary.LittleEndian, s.lastcount)
  239. if err != nil {
  240. return err
  241. }
  242. n, err := s.bufw.Write(data.GetData())
  243. if err != nil {
  244. return err
  245. }
  246. if n != data.Size() {
  247. errinfo := fmt.Sprintf("2,n:%d, size:%d", n, data.Size())
  248. return errors.New(errinfo)
  249. }
  250. s.lasttime = t
  251. s.lastcount++
  252. /*id := data.GetId()
  253. s.posMut.Lock()
  254. s.posMap = nil
  255. s.posMap = make(map[int32]map[int64]int64)
  256. s.posMap[t] = make(map[int64]int64)
  257. s.posMap[t][id] = headerLen
  258. s.posMut.Unlock()*/
  259. return nil
  260. }
  261. func (s *Save) Close() {
  262. s.cur.Close()
  263. }
  264. func (s *Save) Flush() {
  265. s.bufw.Flush()
  266. s.cur.Sync()
  267. }
  268. func (s *Save) getFileName(baseT int64) string {
  269. t := time.Unix(baseT, 0)
  270. monthStr := fmt.Sprintf("%d", t.Month())
  271. dir := path.Join(s.basePath, fmt.Sprint(t.Year()), monthStr, fmt.Sprint(t.Day()))
  272. if dir != s.path {
  273. os.MkdirAll(dir, 0777)
  274. s.path = dir
  275. }
  276. fname := s.path + "/" + time.Unix(baseT, 0).Format("2006-01-02-15-04-05") + ".bin"
  277. return fname
  278. }
  279. /*func (s *Save) indexingFile(fname string, empty LoggingSave) error {
  280. fp, err := os.Open(fname)
  281. if err != nil {
  282. return err
  283. }
  284. defer fp.Close()
  285. var lasttime, readonly, lastcount int32
  286. err = binary.Read(fp, binary.LittleEndian, &lasttime)
  287. if err != nil {
  288. return err
  289. }
  290. err = binary.Read(fp, binary.LittleEndian, &readonly)
  291. if err != nil {
  292. return err
  293. }
  294. err = binary.Read(fp, binary.LittleEndian, &lastcount)
  295. if err != nil {
  296. return err
  297. }
  298. pos := headerLen
  299. buf := make([]byte, empty.Size())
  300. for err == nil {
  301. _, err = io.ReadFull(fp, buf)
  302. if err != nil {
  303. if err == io.EOF {
  304. return nil
  305. } else {
  306. return err
  307. }
  308. }
  309. data := empty.LoadData(buf)
  310. time := data.GetTime()
  311. id := data.GetId()
  312. _, ok := s.posMap[time]
  313. if !ok {
  314. s.posMap[time] = make(map[int64]int64)
  315. }
  316. s.posMap[time][id] = int64(pos)
  317. pos += empty.Size()
  318. }
  319. return nil
  320. }*/
  321. func (s *Save) saveDB(start, end, count int32, fname string) error {
  322. //要么成功,要么就是失败
  323. tx, err := s.db.Begin()
  324. if err != nil {
  325. return err
  326. }
  327. q := fmt.Sprintf("insert into tick_index (begtime, endtime, path, ty, tickcount, totalcount) values ('%d', '%d', '%s', '%s', '%d', '%d')",
  328. start, end, fname, s.typ, count, 0)
  329. _, err = tx.Exec(q)
  330. //log.Println("debug saveDB", q)
  331. if err != nil {
  332. tx.Rollback()
  333. return err
  334. }
  335. return tx.Commit()
  336. }
  337. func zipAndSort(fname string, empty LoggingSave, start, end, count *int32) (string, error) {
  338. fp, err := os.Open(fname)
  339. if err != nil {
  340. return "", err
  341. }
  342. defer fp.Close()
  343. var lasttime, readonly, lastcount int32
  344. err = binary.Read(fp, binary.LittleEndian, &lasttime)
  345. if err != nil {
  346. return "", err
  347. }
  348. err = binary.Read(fp, binary.LittleEndian, &readonly)
  349. if err != nil {
  350. return "", err
  351. }
  352. err = binary.Read(fp, binary.LittleEndian, &lastcount)
  353. if err != nil {
  354. return "", err
  355. }
  356. buf := make([]byte, empty.Size())
  357. _, err = io.ReadFull(fp, buf)
  358. data := empty.LoadData(buf)
  359. *start = data.GetTime()
  360. *end = lasttime
  361. *count = lastcount
  362. fnameZip := strings.Replace(fname, ".bin", ".gz", 1)
  363. fpz, err := os.Create(fnameZip)
  364. if err != nil {
  365. return "", err
  366. }
  367. defer fpz.Close()
  368. gw := gzip.NewWriter(fpz)
  369. defer gw.Close()
  370. _, err = fp.Seek(0, os.SEEK_SET)
  371. _, err = io.Copy(gw, fp)
  372. if err != nil {
  373. if err != io.EOF {
  374. return "", err
  375. }
  376. }
  377. gw.Flush()
  378. fp.Close()
  379. os.Remove(fname)
  380. return fnameZip, nil
  381. }
  382. func (s *Save) ZipAndSort(empty LoggingSave) {
  383. for {
  384. fname := <-s.zipCh
  385. var start, end, count int32
  386. fnameZip, err := zipAndSort(fname, empty, &start, &end, &count)
  387. if fnameZip == "" {
  388. log.Println("ZipAndSort", fname, err)
  389. } else {
  390. err := s.saveDB(start, end, count, fnameZip)
  391. if err != nil {
  392. log.Println("debug ZipAndSort", err)
  393. }
  394. }
  395. }
  396. }
  397. func (s *Save) PrintDataIndex() {
  398. //for fname, datloc := range s.filelocations {
  399. //log.Println(fname, datloc.count, datloc.start, datloc.end)
  400. //for k, v := range datloc.datlocs {
  401. //log.Println(k, v)
  402. //}
  403. //}
  404. //for k, v := range s.timefiles {
  405. //log.Println(k, v)
  406. //}
  407. //for i, v := range s.files {
  408. //log.Println(i, v)
  409. //}
  410. }
  411. func getBaseTime(fname string) int32 {
  412. _, name := path.Split(fname)
  413. shortname := strings.Split(name, ".")[0]
  414. shortname += "+08:00"
  415. t, _ := time.Parse("2006-01-02-15-04-05Z07:00", shortname)
  416. return int32(t.Unix())
  417. }
  418. func (s *Save) GetData(start, end int64, offset, count int, empty LoggingSave) []LoggingSave {
  419. return nil
  420. }
  421. /*func (s *Save) GetData(start, end int64, offset, count int, empty LoggingSave) []LoggingSave {
  422. var datas []LoggingSave
  423. pos := int64(-1)
  424. if start == 0 {
  425. pos = headerLen
  426. } else {
  427. id := end
  428. s.posMut.Lock()
  429. poss, ok := s.posMap[int32(start)]
  430. if ok {
  431. pos = poss[id]
  432. }
  433. s.posMut.Unlock()
  434. }
  435. if pos == -1 {
  436. return nil
  437. }
  438. onesize := empty.Size()
  439. buf := make([]byte, onesize)
  440. if s.cur == nil {
  441. return nil
  442. }
  443. fp, err := os.Open(s.cur.Name())
  444. if err != nil {
  445. log.Println("GetData.Open", err)
  446. return datas
  447. }
  448. defer fp.Close()
  449. _, err = fp.Seek(int64(pos), os.SEEK_SET)
  450. if err != nil {
  451. log.Println("GetData.Seek", err)
  452. return datas
  453. }
  454. if offset > 0 {
  455. pos, err := fp.Seek(0, os.SEEK_CUR)
  456. if err != nil {
  457. log.Println("GetData.Seek", err)
  458. return datas
  459. }
  460. fi, err := fp.Stat()
  461. if err != nil {
  462. log.Println("GetData.Stat", err)
  463. return datas
  464. }
  465. datnum := int((fi.Size() - pos)) / onesize
  466. if datnum > offset {
  467. _, err = fp.Seek(int64(offset*onesize), os.SEEK_CUR)
  468. if err != nil {
  469. log.Println("GetData.Seek", err)
  470. return datas
  471. }
  472. offset = 0
  473. } else {
  474. return datas
  475. }
  476. }
  477. for err == nil {
  478. _, err = io.ReadFull(fp, buf)
  479. if err != nil {
  480. if err != io.EOF {
  481. log.Println("GetData.read.data", err)
  482. }
  483. return datas
  484. }
  485. data := empty.LoadData(buf)
  486. if count > 0 {
  487. datas = append(datas, data)
  488. count--
  489. if count == 0 {
  490. return datas
  491. }
  492. }
  493. }
  494. return datas
  495. }*/
  496. func ReadIndex(fname string) ([fileDuration]int64, error) {
  497. var indexs [fileDuration]int64
  498. fnameIdx := strings.Replace(fname, ".bin", ".idx", 1)
  499. f, err := os.Open(fnameIdx)
  500. if err != nil {
  501. return indexs, err
  502. }
  503. defer f.Close()
  504. dec := json.NewDecoder(f)
  505. err = dec.Decode(&indexs)
  506. if err != nil {
  507. return indexs, err
  508. }
  509. return indexs, nil
  510. }
  511. func getCurPath(dataPath string) string {
  512. t := time.Now()
  513. monthStr := fmt.Sprintf("%d", t.Month())
  514. nowpath := path.Join(dataPath, fmt.Sprint(t.Year()), monthStr, fmt.Sprint(t.Day()))
  515. infos, err := ioutil.ReadDir(dataPath)
  516. if err != nil {
  517. os.MkdirAll(nowpath, 0777)
  518. return nowpath
  519. }
  520. var yearpath, monthpath, daypath, emptystr string
  521. var iYear, iMonth, iDay int
  522. //tBase := time.Unix(0, 0)
  523. for _, v := range infos {
  524. if v.IsDir() { //&& (v.Name() > fmt.Sprint(tBase.Year()) && v.Name() <= fmt.Sprint(t.Year()))
  525. itmp, _ := strconv.Atoi(v.Name())
  526. if itmp > iYear {
  527. yearpath = v.Name()
  528. iYear = itmp
  529. }
  530. }
  531. }
  532. if yearpath == emptystr {
  533. os.MkdirAll(nowpath, 0777)
  534. return nowpath
  535. } else {
  536. dataPath = path.Join(dataPath, yearpath)
  537. infos, err = ioutil.ReadDir(dataPath)
  538. if err != nil {
  539. os.MkdirAll(nowpath, 0777)
  540. return nowpath
  541. }
  542. for _, v := range infos {
  543. if v.IsDir() { //&& (v.Name() >= "1" && v.Name() <= "12")
  544. itmp, _ := strconv.Atoi(v.Name())
  545. if itmp > iMonth {
  546. monthpath = v.Name()
  547. iMonth = itmp
  548. }
  549. }
  550. }
  551. if monthpath == emptystr {
  552. os.MkdirAll(nowpath, 0777)
  553. return nowpath
  554. } else {
  555. dataPath = path.Join(dataPath, monthpath)
  556. infos, err = ioutil.ReadDir(dataPath)
  557. if err != nil {
  558. os.MkdirAll(nowpath, 0777)
  559. return nowpath
  560. }
  561. for _, v := range infos {
  562. if v.IsDir() { // && (v.Name() >= "1" && v.Name() <= "31")
  563. itmp, _ := strconv.Atoi(v.Name())
  564. if itmp > iDay {
  565. daypath = v.Name()
  566. iDay = itmp
  567. }
  568. }
  569. }
  570. if daypath == emptystr {
  571. os.MkdirAll(nowpath, 0777)
  572. return nowpath
  573. } else {
  574. dataPath = path.Join(dataPath, daypath)
  575. os.MkdirAll(dataPath, 0777)
  576. return dataPath
  577. }
  578. }
  579. }
  580. os.MkdirAll(nowpath, 0777)
  581. return nowpath
  582. }