db.go 6.4 KB


  1. // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved.
  2. package market
  3. // 本文件实现保存的Tick和Candle文件信息的存取和查询
  4. import (
  5. "database/sql"
  6. "errors"
  7. "fmt"
  8. "log"
  9. "sync"
  10. _ "github.com/go-sql-driver/mysql"
  11. _ "github.com/mattn/go-sqlite3"
  12. )
  13. // 一种产品+周期一个表, 表名字: NAME_TYPE ==> NAME表示产品NAME, TYPE包括: M1,M5,H1,D1,Tick
  14. // 每条记录包括: filePath, startTime, endTime, count ==> 文件路径, 文件中第一条数据时间, 最后一条数据时间, 数据条数
  15. type MyDB struct {
  16. sync.RWMutex // 避免sqlite同时读写发生locked
  17. *sql.DB
  18. }
  19. func NewMyDB(conf *DBConf) (*MyDB, error) {
  20. dsn := conf.DBName
  21. if conf.DBDriver == "mysql" {
  22. dsn = conf.DSN
  23. }
  24. db, err := sql.Open(conf.DBDriver, dsn)
  25. if err != nil {
  26. return nil, err
  27. }
  28. myDB := &MyDB{DB: db}
  29. return myDB, nil
  30. }
  31. func (db *MyDB) ExecSql(sql string) error {
  32. db.RLock()
  33. defer db.RUnlock()
  34. _, err := db.Exec(sql)
  35. if err != nil {
  36. return errors.New("MyDB.ExecSql: db.Exec error: " + err.Error())
  37. }
  38. return nil
  39. }
  40. func (db *MyDB) AllTableName() ([]string, error) {
  41. log.Println("MyDB.AllTableName")
  42. db.RLock()
  43. defer db.RUnlock()
  44. sql := fmt.Sprintf("select name from sqlite_master;")
  45. rows, err := db.Query(sql)
  46. if err != nil {
  47. return nil, errors.New("MyDB.AllTableName error: " + err.Error())
  48. }
  49. defer rows.Close()
  50. tnames := []string{}
  51. var name string
  52. for rows.Next() {
  53. err := rows.Scan(&name)
  54. if err != nil {
  55. log.Fatal(err)
  56. }
  57. tnames = append(tnames, name)
  58. }
  59. return tnames, nil
  60. }
  61. func (db *MyDB) UpdateHisLastTime(tname, typ string, lastTime int64) error {
  62. return db.updateHisLastTime(tname, typ, lastTime)
  63. }
  64. func (db *MyDB) updateHisLastTime(tname, typ string, lastTime int64) error {
  65. db.Lock()
  66. defer db.Unlock()
  67. sql := fmt.Sprintf("update %s set LastTime=? where Id=?", tname)
  68. _, err := db.Exec(sql, lastTime, typ)
  69. if err != nil {
  70. return err
  71. }
  72. return nil
  73. }
  74. func (db *MyDB) GetHisLastTime(tname, typ string) (int64, error) {
  75. var lasttime int64
  76. err := db.createHisTable(tname)
  77. if err != nil {
  78. return 0, err
  79. }
  80. lasttime, err = db.getHisLastTime(tname, typ)
  81. if lasttime == -1 {
  82. if err != nil {
  83. log.Println("db.getHisLastTime", err)
  84. }
  85. err = db.insertHis(typ, tname, 0)
  86. }
  87. return lasttime, err
  88. }
  89. func (db *MyDB) createHisTable(name string) error {
  90. db.Lock()
  91. defer db.Unlock()
  92. sql := fmt.Sprintf("create table if not exists %s (Id varchar(32), LastTime bigint, PRIMARY KEY (Id) )", name)
  93. _, err := db.Exec(sql)
  94. if err != nil {
  95. return errors.New("MyDB.createHisTable error: " + err.Error())
  96. }
  97. return nil
  98. }
  99. func (db *MyDB) getHisLastTime(tname, typ string) (int64, error) {
  100. db.RLock()
  101. defer db.RUnlock()
  102. var id string
  103. lasttime := int64(-1)
  104. sql := fmt.Sprintf("select * from %s where Id = ?", tname)
  105. Rows, err := db.Query(sql, typ)
  106. if err != nil {
  107. return lasttime, err
  108. }
  109. defer Rows.Close()
  110. for Rows.Next() {
  111. err := Rows.Scan(&id, &lasttime)
  112. if err != nil {
  113. return lasttime, err
  114. }
  115. }
  116. return lasttime, nil
  117. }
  118. func (db *MyDB) insertHis(typ, tname string, lasttime int64) error {
  119. db.Lock()
  120. defer db.Unlock()
  121. sql := fmt.Sprintf("insert into %s values(?,?)", tname)
  122. _, err := db.Exec(sql, typ, lasttime)
  123. if err != nil {
  124. return errors.New("MyDb.insertHis error: " + err.Error())
  125. }
  126. return nil
  127. }
  128. func (db *MyDB) createInsTable(name string) error {
  129. // db.Lock()
  130. // defer db.Unlock()
  131. sql := fmt.Sprintf("create table if not exists %s (Id varchar(32), Name text, Typ text, Exid text, PriceInc real, Margin real, TickStartTime bigint, M1StartTime bigint, D1StartTime bigint, PRIMARY KEY (Id) )", name)
  132. _, err := db.Exec(sql)
  133. if err != nil {
  134. return errors.New("MyDB.createInsTable error: " + err.Error())
  135. }
  136. return nil
  137. }
  138. func (db *MyDB) InsertIns(ins *Instrument, tname string) error {
  139. return db.insertIns(ins, tname)
  140. }
  141. func (db *MyDB) insertIns(ins *Instrument, tname string) error {
  142. db.Lock()
  143. defer db.Unlock()
  144. err := db.createInsTable(tname)
  145. if err != nil {
  146. return errors.New("MyDB.insertIns: db.createInsTable error: " + err.Error() + tname)
  147. }
  148. sql := fmt.Sprintf("insert into %s values(?,?,?,?,?,?,?,?,?)", tname)
  149. _, err = db.Exec(sql, ins.Id, ins.Name, ins.Typ, ins.ExId, ins.PriceInc, ins.Margin, ins.StartTime, ins.StartTime, ins.StartTime)
  150. if err != nil {
  151. return errors.New("MyDb.insertIns error: " + err.Error())
  152. }
  153. return nil
  154. }
  155. func (db *MyDB) getIns(tname string) (map[string]*Instrument, error) {
  156. db.RLock()
  157. defer db.RUnlock()
  158. sql := fmt.Sprintf("select * from %s", tname)
  159. Rows, err := db.Query(sql)
  160. if err != nil {
  161. return nil, err
  162. }
  163. defer Rows.Close()
  164. m := make(map[string]*Instrument)
  165. for Rows.Next() {
  166. ins := &Instrument{}
  167. err := Rows.Scan(&ins.Id, &ins.Name, &ins.Typ, &ins.ExId, &ins.PriceInc, &ins.Margin, &ins.StartTime, &ins.EndTime)
  168. if err != nil {
  169. return nil, err
  170. }
  171. m[ins.Id] = ins
  172. }
  173. return m, nil
  174. }
  175. func (db *MyDB) UpdateInsStartTime(tname, insId string, startTime int64, period int) error {
  176. return db.updateInsStartTime(tname, insId, startTime, period)
  177. }
  178. func (db *MyDB) updateInsStartTime(tname, insId string, startTime int64, period int) error {
  179. db.Lock()
  180. defer db.Unlock()
  181. var field string
  182. if period == 0 {
  183. field = "TickStartTime"
  184. } else if period == M1 {
  185. field = "M1StartTime"
  186. } else {
  187. field = "D1StartTime"
  188. }
  189. sql := fmt.Sprintf("update %s set %s=? where Id=? and %s < ?", tname, field, field)
  190. _, err := db.Exec(sql, startTime, insId, startTime)
  191. if err != nil {
  192. return err
  193. }
  194. return nil
  195. }
  196. func (db *MyDB) GetInsStartTime(tname, insId string, period int) (int64, error) {
  197. return db.getInsStartTime(tname, insId, period)
  198. }
  199. func (db *MyDB) getInsStartTime(tname, insId string, period int) (int64, error) {
  200. db.RLock()
  201. defer db.RUnlock()
  202. var field string
  203. if period == 0 {
  204. field = "TickStartTime"
  205. } else if period == M1 {
  206. field = "M1StartTime"
  207. } else {
  208. field = "D1StartTime"
  209. }
  210. sql := fmt.Sprintf("select %s from %s where Id = '%s'", field, tname, insId)
  211. Rows, err := db.Query(sql)
  212. if err != nil {
  213. return -1, err
  214. }
  215. defer Rows.Close()
  216. var startTime int64
  217. for Rows.Next() {
  218. err := Rows.Scan(&startTime)
  219. if err != nil {
  220. return -1, err
  221. }
  222. return startTime, nil
  223. }
  224. return -1, errors.New("Cannot get start time.")
  225. }