|
- // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved.
- // 本程序用来把永华的期货数据导入到tickserver中
- package main
- import (
- "database/sql"
- "encoding/json"
- "fmt"
- "log"
- "os"
- "path"
- "sort"
- "strings"
- "tickserver/server/market"
- )
- var pair = []string{
- "EURUSD",
- "EURGBP",
- "GBPUSD",
- "USDJPY",
- "USDCHF",
- "AUDUSD",
- "USDCAD",
- "NZDUSD",
- "CHFJPY",
- "EURJPY",
- "EURCHF",
- "EURAUD",
- "EURCAD",
- "GBPCHF",
- "GBPJPY",
- "OILUSD",
- "CADJPY",
- "AUDJPY",
- "AUDCAD",
- "AUDNZD",
- "XAGUSD",
- "XAUUSD",
- }
- const (
- PTK = 0 //period index
- PM1 = 1
- PM5 = 2
- PH1 = 3
- PD1 = 4
- TK = 0 //period duration
- M1 = 1 * 60
- M5 = 5 * 60
- H1 = 60 * 60
- D1 = 24 * 3600
- )
- const (
- DATATYPE = "easyforex"
- BUFLEN = 1024
- )
- var PeriodMap = map[int]int{
- PTK: TK,
- PM1: M1,
- PM5: M5,
- PH1: H1,
- PD1: D1,
- }
- var PeriodNameMap = map[int]string{
- TK: "Tick",
- M1: "M1",
- M5: "M5",
- H1: "H1",
- D1: "D1",
- }
- type Tick struct { //tick结构
- time int64
- buyPercent float64
- }
- type Conf struct {
- DSN string // dsn = fmt.Sprintf("root:fzm@1001@/%s?charset=%s", dbName, "utf8")
- DataDir string
- }
- type byTime []market.Candle
- func (a byTime) Len() int { return len(a) }
- func (a byTime) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
- func (a byTime) Less(i, j int) bool { return a[i].Timestamp < a[j].Timestamp }
- var conf *Conf
- func main() {
- // set log
- /*logF, err := os.Create("./hisconv.log.txt")
- if err != nil {
- log.Fatal(err)
- }
- defer logF.Close()
- log.SetOutput(logF)*/
- var err error
- conf, err = ReadConf()
- if err != nil {
- log.Fatal(err)
- }
- go getTick()
- select {}
- }
- func connectDB(dsn string) (*sql.DB, error) {
- db, err := sql.Open("mysql", dsn)
- if err != nil {
- return nil, err
- }
- return db, nil
- }
- func getInstrumentName(symbol string) string {
- return DATATYPE + "_" + symbol
- }
- func getTableName(symbol string, period int) string {
- tableName := symbol
- tableName += "_"
- tableName += PeriodNameMap[period]
- return strings.ToLower(tableName)
- }
- func getTickFromDB(symbol string, period int, tkCh chan<- Tick) error {
- db, err := connectDB(conf.DSN)
- if err != nil {
- log.Println("error: connect db", symbol, err)
- return err
- }
- defer db.Close()
- tableName := getTableName(symbol, period)
- var offset int
- for {
- queryString := fmt.Sprintf("SELECT time,buy_percent FROM %s order by time ASC limit %d,1000", tableName, offset)
- //log.Println(queryString)
- rows, err := db.Query(queryString)
- if err != nil {
- log.Println("error: query", symbol, err)
- return err
- }
- defer rows.Close()
- var count int
- for rows.Next() {
- var tk Tick
- if err := rows.Scan(&tk.time, &tk.buyPercent); err != nil {
- log.Println("error: scan", symbol, err)
- return err
- }
- tkCh <- tk
- count++
- }
- offset += count
- if count < 1000 {
- break
- }
- }
- log.Println("source tick of", symbol, "finished.")
- tk := Tick{-1, 0.0} //notify save goroutine there's no data anymore
- tkCh <- tk
- return nil
- }
- func processTick(symbol string, tkCh <-chan Tick) {
- var ticks []market.Tick
- insId := getInstrumentName(symbol)
- var last int64
- for {
- tk := <-tkCh
- if tk.time == -1 || ((tk.time/(3600*24) != last) && (last != 0)) {
- if len(ticks) > 0 {
- fname, err := market.SaveTickEx(conf.DataDir, ticks, insId, true)
- if err != nil {
- log.Println("error: savetickex", symbol, fname, err)
- }
- err = convAndSaveCandles(insId, ticks)
- if err != nil {
- log.Println("error: convandsavecandles", symbol, err)
- }
- ticks = nil
- }
- if tk.time == -1 {
- break
- }
- }
- var mk market.Tick
- mk.Timestamp = tk.time * 1000
- mk.Price = 100 - tk.buyPercent
- ticks = append(ticks, mk)
- last = tk.time / (3600 * 24)
- }
- }
- func getTick() (err error) {
- for _, symbol := range pair {
- tkCh := make(chan Tick, 1024)
- go getTickFromDB(symbol, TK, tkCh)
- go processTick(symbol, tkCh)
- }
- return nil
- }
- func convAndSaveCandles(insId string, ticks []market.Tick) error {
- //refer := path
- var candles []market.Candle
- pa := []int{market.M1, market.M5, market.H1, market.D1}
- for _, period := range pa {
- var err error
- if period == market.M1 {
- candles, err = convCandles0(ticks, insId, market.M1)
- } else {
- candles, err = convCandles1(candles, insId, period)
- }
- if err != nil {
- return err
- }
- if period == market.D1 {
- dir := path.Join(conf.DataDir, insId)
- os.MkdirAll(dir, 0777)
- var bname string
- bname = fmt.Sprintf("%s.gz", market.PeriodNameMap[period])
- fname := path.Join(dir, bname)
- candles, _ = combinEx(fname, candles)
- }
- _, err = market.SaveCandlesEx(conf.DataDir, insId, candles, period, true)
- if err != nil {
- return err
- }
- }
- return nil
- }
- func convCandles0(ticks []market.Tick, insId string, period int) ([]market.Candle, error) {
- r := market.NewTickBuf(ticks)
- return market.TickConvCandle(r, insId, period)
- }
- func convCandles1(candles []market.Candle, insId string, period int) ([]market.Candle, error) {
- r := market.NewCandleBuf(candles)
- return market.ConvPeriod(r, insId, period)
- }
- func ReadConf() (*Conf, error) {
- f, err := os.Open("efhisconv.json")
- if err != nil {
- return nil, err
- }
- defer f.Close()
- dec := json.NewDecoder(f)
- conf := &Conf{}
- err = dec.Decode(conf)
- if err != nil {
- return nil, err
- }
- return conf, nil
- }
- func combinEx(filename string, candles []market.Candle) ([]market.Candle, error) {
- buf, err := market.ReadCandleFile(filename)
- if err != nil {
- return candles, err
- }
- candles = append(buf, candles[:]...)
- sort.Sort(byTime(candles))
- return candles, nil
- }
|