// Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved. package market // 本文件实现保存的Tick和Candle文件信息的存取和查询 import ( "database/sql" "errors" "fmt" "log" "sync" _ "github.com/go-sql-driver/mysql" _ "github.com/mattn/go-sqlite3" ) // 一种产品+周期一个表, 表名字: NAME_TYPE ==> NAME表示产品NAME, TYPE包括: M1,M5,H1,D1,Tick // 每条记录包括: filePath, startTime, endTime, count ==> 文件路径, 文件中第一条数据时间, 最后一条数据时间, 数据条数 type MyDB struct { sync.RWMutex // 避免sqlite同时读写发生locked *sql.DB } func NewMyDB(conf *DBConf) (*MyDB, error) { dsn := conf.DBName if conf.DBDriver == "mysql" { dsn = conf.DSN } db, err := sql.Open(conf.DBDriver, dsn) if err != nil { return nil, err } myDB := &MyDB{DB: db} return myDB, nil } func (db *MyDB) ExecSql(sql string) error { db.RLock() defer db.RUnlock() _, err := db.Exec(sql) if err != nil { return errors.New("MyDB.ExecSql: db.Exec error: " + err.Error()) } return nil } func (db *MyDB) AllTableName() ([]string, error) { log.Println("MyDB.AllTableName") db.RLock() defer db.RUnlock() sql := fmt.Sprintf("select name from sqlite_master;") rows, err := db.Query(sql) if err != nil { return nil, errors.New("MyDB.AllTableName error: " + err.Error()) } defer rows.Close() tnames := []string{} var name string for rows.Next() { err := rows.Scan(&name) if err != nil { log.Fatal(err) } tnames = append(tnames, name) } return tnames, nil } func (db *MyDB) UpdateHisLastTime(tname, typ string, lastTime int64) error { return db.updateHisLastTime(tname, typ, lastTime) } func (db *MyDB) updateHisLastTime(tname, typ string, lastTime int64) error { db.Lock() defer db.Unlock() sql := fmt.Sprintf("update %s set LastTime=? where Id=?", tname) _, err := db.Exec(sql, lastTime, typ) if err != nil { return err } return nil } func (db *MyDB) GetHisLastTime(tname, typ string) (int64, error) { var lasttime int64 err := db.createHisTable(tname) if err != nil { return 0, err } lasttime, err = db.getHisLastTime(tname, typ) if lasttime == -1 { if err != nil { log.Println("db.getHisLastTime", err) } err = db.insertHis(typ, tname, 0) } return lasttime, err } func (db *MyDB) createHisTable(name string) error { db.Lock() defer db.Unlock() sql := fmt.Sprintf("create table if not exists %s (Id varchar(32), LastTime bigint, PRIMARY KEY (Id) )", name) _, err := db.Exec(sql) if err != nil { return errors.New("MyDB.createHisTable error: " + err.Error()) } return nil } func (db *MyDB) getHisLastTime(tname, typ string) (int64, error) { db.RLock() defer db.RUnlock() var id string lasttime := int64(-1) sql := fmt.Sprintf("select * from %s where Id = ?", tname) Rows, err := db.Query(sql, typ) if err != nil { return lasttime, err } defer Rows.Close() for Rows.Next() { err := Rows.Scan(&id, &lasttime) if err != nil { return lasttime, err } } return lasttime, nil } func (db *MyDB) insertHis(typ, tname string, lasttime int64) error { db.Lock() defer db.Unlock() sql := fmt.Sprintf("insert into %s values(?,?)", tname) _, err := db.Exec(sql, typ, lasttime) if err != nil { return errors.New("MyDb.insertHis error: " + err.Error()) } return nil } func (db *MyDB) createInsTable(name string) error { // db.Lock() // defer db.Unlock() sql := fmt.Sprintf("create table if not exists %s (Id varchar(32), Name text, Typ text, Exid text, PriceInc real, Margin real, TickStartTime bigint, M1StartTime bigint, D1StartTime bigint, PRIMARY KEY (Id) )", name) _, err := db.Exec(sql) if err != nil { return errors.New("MyDB.createInsTable error: " + err.Error()) } return nil } func (db *MyDB) InsertIns(ins *Instrument, tname string) error { return db.insertIns(ins, tname) } func (db *MyDB) insertIns(ins *Instrument, tname string) error { db.Lock() defer db.Unlock() err := db.createInsTable(tname) if err != nil { return errors.New("MyDB.insertIns: db.createInsTable error: " + err.Error() + tname) } sql := fmt.Sprintf("insert into %s values(?,?,?,?,?,?,?,?,?)", tname) _, err = db.Exec(sql, ins.Id, ins.Name, ins.Typ, ins.ExId, ins.PriceInc, ins.Margin, ins.StartTime, ins.StartTime, ins.StartTime) if err != nil { return errors.New("MyDb.insertIns error: " + err.Error()) } return nil } func (db *MyDB) getIns(tname string) (map[string]*Instrument, error) { db.RLock() defer db.RUnlock() sql := fmt.Sprintf("select * from %s", tname) Rows, err := db.Query(sql) if err != nil { return nil, err } defer Rows.Close() m := make(map[string]*Instrument) for Rows.Next() { ins := &Instrument{} err := Rows.Scan(&ins.Id, &ins.Name, &ins.Typ, &ins.ExId, &ins.PriceInc, &ins.Margin, &ins.StartTime, &ins.EndTime) if err != nil { return nil, err } m[ins.Id] = ins } return m, nil } func (db *MyDB) UpdateInsStartTime(tname, insId string, startTime int64, period int) error { return db.updateInsStartTime(tname, insId, startTime, period) } func (db *MyDB) updateInsStartTime(tname, insId string, startTime int64, period int) error { db.Lock() defer db.Unlock() var field string if period == 0 { field = "TickStartTime" } else if period == M1 { field = "M1StartTime" } else { field = "D1StartTime" } sql := fmt.Sprintf("update %s set %s=? where Id=? and %s < ?", tname, field, field) _, err := db.Exec(sql, startTime, insId, startTime) if err != nil { return err } return nil } func (db *MyDB) GetInsStartTime(tname, insId string, period int) (int64, error) { return db.getInsStartTime(tname, insId, period) } func (db *MyDB) getInsStartTime(tname, insId string, period int) (int64, error) { db.RLock() defer db.RUnlock() var field string if period == 0 { field = "TickStartTime" } else if period == M1 { field = "M1StartTime" } else { field = "D1StartTime" } sql := fmt.Sprintf("select %s from %s where Id = '%s'", field, tname, insId) Rows, err := db.Query(sql) if err != nil { return -1, err } defer Rows.Close() var startTime int64 for Rows.Next() { err := Rows.Scan(&startTime) if err != nil { return -1, err } return startTime, nil } return -1, errors.New("Cannot get start time.") }