package fix import "fmt" import "net" import "encoding/binary" import "os" //import "syscall" //import "unsafe" import "time" //import "errors" import "sync" import _ "github.com/go-sql-driver/mysql" import "database/sql" import "database/model" const listenport = ":8898" const NUM_PER_SYMBOL = 10//0000 const ( EURUSD = iota GBPUSD USDJPY USDCHF AUDUSD USDCAD NZDUSD EURGBP EURJPY EURCHF EURAUD EURCAD GBPCHF GBPJPY CHFJPY CADJPY AUDJPY AUDCAD USDMXN AUDNZD XAGUSD XAUUSD OILUSD ) type Tick struct { AskCount int32 BidCount int32 AskPrice []float64 BidPrice []float64 AskVolume []float64 BidVolume []float64 Symbol [8]byte Time int32 Millisecond int32 } type SymbolInfo struct { BufIndex int BufPos int } type ClientInfo struct{ BufIndex int DataPos int DataNum int16 } type ClientReqInfo struct{ BeginTime int64 Symbol int16 DataNum int16 } type TimeAndPos struct { Time int64 Pos int } var broaddatach chan TimeAndPos var fileLog *os.File var fileIndex int64 var connmapreal map[net.Conn] chan TimeAndPos var symbolinfomap map[string] SymbolInfo var symbolidmap map[int16] string var TickBuf [][NUM_PER_SYMBOL]TickFull var mutex sync.Mutex var db *sql.DB var groupcommit *model.SqlQueue func debug(a ...interface{}) (n int, err error) { mutex.Lock() fileinfo,_ := fileLog.Stat() if fileinfo.Size() >= 10*1024*1024 { fileLog.Close() fileIndex++ filename := fmt.Sprintf("log%d.txt",fileIndex) fileLog, err = os.OpenFile(filename, os.O_RDWR | os.O_CREATE | os.O_TRUNC, 0644) if err != nil { return -1,err } } mutex.Unlock() tmptime := time.Now() timestr := fmt.Sprintf("%02d-%02d %02d:%02d:%02d:%d",tmptime.Month(),tmptime.Day(),tmptime.Hour(),tmptime.Minute(),tmptime.Second(),tmptime.Nanosecond()) return fmt.Fprintln(fileLog,timestr,a) } func FindPosition(databuf []TickFull,beginpos int,endpos int,time int64)(pos int,re int){ begintime := GetTotalTime(databuf[beginpos].Time,databuf[beginpos].Millisecond) if time <= begintime { debug("time:",time,"databuf[beginpos].Time:",begintime) return beginpos,0 } endtime := GetTotalTime(databuf[endpos].Time,databuf[endpos].Millisecond) if time >= endtime { debug("time:",time,"databuf[endpos].Time:",endtime) return endpos,0 } midpos := beginpos + (endpos - beginpos)/2 midtime := GetTotalTime(databuf[midpos].Time,databuf[midpos].Millisecond) if time == midtime { debug("midpos",midpos) return midpos,0 }else{ if endpos == (beginpos + 1) { debug("endpos:",endpos) return endpos,0 }else { if time < midtime { debug("beginpos:",beginpos,"midpos:",midpos,"time:",time) return FindPosition(databuf,beginpos,midpos,time) }else{ debug("midpos:",midpos,"endpos:",endpos,"time:",time) return FindPosition(databuf,midpos,endpos,time) } } } debug(-1,-1) return -1,-1 } func ReadClientRequest(conn net.Conn){ var cri ClientReqInfo err := binary.Read(conn, binary.LittleEndian, &cri) if err == nil { var ok bool var tmpsymbolinfo SymbolInfo var tmpsymbolstr string if cri.Symbol != -1 { tmpsymbolstr,ok = symbolidmap[cri.Symbol] if ok { tmpsymbolinfo,ok = symbolinfomap[tmpsymbolstr] } } if (cri.Symbol == -1) || (ok) { debug("begintime:",cri.BeginTime,"symbol:",cri.Symbol) var ci ClientInfo ci.BufIndex = tmpsymbolinfo.BufIndex ci.DataNum = cri.DataNum if cri.BeginTime == -1 || cri.BeginTime == 0 { if cri.BeginTime == -1 { ci.DataPos = tmpsymbolinfo.BufPos }else{ ci.DataPos = 0 } debug("ci.BufIndex:",ci.BufIndex,"ci.DataPos:",ci.DataPos) }else{ pos,re := FindPosition(TickBuf[tmpsymbolinfo.BufIndex][:],0,tmpsymbolinfo.BufPos,cri.BeginTime) if re >= 0 { ci.DataPos = pos debug("ci.BufIndex:",ci.BufIndex,"ci.DataPos:",ci.DataPos) }else{ ci.DataPos = 0 debug("ci.BufIndex:",ci.BufIndex,"ci.DataPos:",ci.DataPos) } } clientch := make(chan TimeAndPos) connmapreal[conn] = clientch go SendData(conn,ci,clientch,cri.Symbol,tmpsymbolstr) var tmptimeandpos TimeAndPos tmptimeandpos.Time = GetTotalTime(TickBuf[tmpsymbolinfo.BufIndex][tmpsymbolinfo.BufPos].Time, TickBuf[tmpsymbolinfo.BufIndex][tmpsymbolinfo.BufPos].Millisecond) tmptimeandpos.Pos = tmpsymbolinfo.BufPos clientch <- tmptimeandpos }else{ var retcode int32 = -1 err := binary.Write(conn, binary.LittleEndian, &retcode) if err != nil { debug(err) } conn.Close() debug("symbol:",cri.Symbol) } }else{ conn.Close() debug(err) } } func DoConnection(port string) { ln, err := net.Listen("tcp", port) if err != nil { panic(err) } for { conn, err := ln.Accept() if err != nil { fmt.Println(err) continue } debug("get client conn") debug("link begin",conn.RemoteAddr().Network(),conn.RemoteAddr().String()) go ReadClientRequest(conn) } } func GetTotalTime(Time int32,Millisecond int32)(TotalTime int64){ TotalTime = int64(Time) TotalTime <<= 32 TotalTime += int64(Millisecond) return } func ReadData(sourdatach chan *TickFull){ for { tf := <-sourdatach tmpsymbolstr := string(tf.Symbol[:]) tmpsymbolinfo,ok := symbolinfomap[tmpsymbolstr] if !ok { tmpsymbolinfo.BufIndex = len(TickBuf) tmpsymbolinfo.BufPos = 0 symbolinfomap[tmpsymbolstr] = tmpsymbolinfo tmpsymbolid := int16(tmpsymbolinfo.BufIndex) symbolidmap[tmpsymbolid] = tmpsymbolstr var tmpbuf [NUM_PER_SYMBOL]TickFull TickBuf = append(TickBuf,tmpbuf) //debug(tmpsymbolstr,tmpsymbolid,tmpsymbolinfo) }else{ tmpsymbolinfo.BufPos++ if tmpsymbolinfo.BufPos >= NUM_PER_SYMBOL { copy(TickBuf[tmpsymbolinfo.BufIndex][0:NUM_PER_SYMBOL/2],TickBuf[tmpsymbolinfo.BufIndex][NUM_PER_SYMBOL/2:]) tmpsymbolinfo.BufPos = NUM_PER_SYMBOL/2 } symbolinfomap[tmpsymbolstr] = tmpsymbolinfo //debug(tmpsymbolstr,tmpsymbolinfo) } TickBuf[tmpsymbolinfo.BufIndex][tmpsymbolinfo.BufPos] = *tf //fmt.Println(TickBuf[CurBufIndex][CurDataPos].Tag,TickBuf[CurBufIndex][CurDataPos].Period,TickBuf[CurBufIndex][CurDataPos].Time,TickBuf[CurBufIndex][CurDataPos].MSec,TickBuf[CurBufIndex][CurDataPos].Symbol,TickBuf[CurBufIndex][CurDataPos].Bid,TickBuf[CurBufIndex][CurDataPos].Ask,TickBuf[CurBufIndex][CurDataPos].High,TickBuf[CurBufIndex][CurDataPos].Low,TickBuf[CurBufIndex][CurDataPos].Volume,"\n") //debug(tmpsymbolinfo.BufIndex,tmpsymbolinfo.BufPos - 1,TickBuf[tmpsymbolinfo.BufIndex][tmpsymbolinfo.BufPos - 1].Time,TickBuf[tmpsymbolinfo.BufIndex][tmpsymbolinfo.BufPos - 1].Millisecond) var curinfo TimeAndPos curinfo.Time = GetTotalTime(tf.Time,tf.Millisecond) curinfo.Pos = tmpsymbolinfo.BufPos //debug(curinfo) if tmpsymbolstr[:4] == "4001" { debug(tmpsymbolinfo.BufPos) debug(TickBuf[0][0].Time,TickBuf[0][1].Time,TickBuf[0][2].Time,TickBuf[0][3].Time,TickBuf[0][4].Time,TickBuf[0][5].Time, TickBuf[0][6].Time,TickBuf[0][7].Time,TickBuf[0][8].Time,TickBuf[0][9].Time) } errdb := groupcommit.ExecStruct(tf, func (elm *model.SqlElement) {}) if errdb != nil { debug("db error",errdb) } broaddatach <- curinfo } } func GetSendData(tf TickFull,num int16)(ti Tick){ if tf.AskCount > int32(num) { ti.AskCount = int32(num) } if tf.BidCount > int32(num) { ti.BidCount = int32(num) } ti.AskPrice = make([]float64,ti.AskCount) ti.AskPrice = tf.AskPrice[0:ti.AskCount] ti.BidPrice = make([]float64,ti.BidCount) ti.BidPrice = tf.BidPrice[0:ti.BidCount] ti.AskVolume = make([]float64,ti.AskCount) ti.AskVolume = tf.AskVolume[0:ti.AskCount] ti.BidVolume = make([]float64,ti.BidCount) ti.BidVolume = tf.BidVolume[0:ti.BidCount] copy(ti.Symbol[:],tf.Symbol[:]) ti.Time = tf.Time ti.Millisecond = tf.Millisecond return } func SendData(conn net.Conn,ci ClientInfo,ch chan TimeAndPos,symbol int16,symbolstr string){ for{ CurInfo := <-ch clienttime := GetTotalTime(TickBuf[ci.BufIndex][ci.DataPos].Time,TickBuf[ci.BufIndex][ci.DataPos].Millisecond) for ;clienttime <= CurInfo.Time; { if symbol == -1 || symbolstr == string(TickBuf[ci.BufIndex][ci.DataPos].Symbol[:]) { tmptick := GetSendData(TickBuf[ci.BufIndex][ci.DataPos],ci.DataNum) err := binary.Write(conn, binary.LittleEndian, &tmptick) if err != nil { debug("link end",err,conn.RemoteAddr().Network(),conn.RemoteAddr().String()) delete(connmapreal,conn) conn.Close() return } } needbreak := false if clienttime == CurInfo.Time { needbreak = true } ci.DataPos = ci.DataPos + 1 if ci.DataPos > CurInfo.Pos { ci.DataPos = 0 } if needbreak { break } } } } func BroadcastData(){ for { curtime := <-broaddatach for _,clientch := range connmapreal { select { case clientch <- curtime: default: } } } } func InitBroadcast(sourdatach chan *TickFull) { connmapreal = make(map[net.Conn] chan TimeAndPos) symbolinfomap = make(map[string] SymbolInfo) symbolidmap = make(map[int16] string) broaddatach = make(chan TimeAndPos) filename := fmt.Sprintf("log%d.txt",fileIndex) fileLog, _ = os.OpenFile(filename, os.O_RDWR | os.O_CREATE | os.O_TRUNC, 0644) var err error db,err = sql.Open("mysql","wr_mtuser:jDfSL6e9ZSCQDySP@tcp(211.155.230.186:3306)/wr_mtuser?charset=utf8") if err != nil { debug(err) return } groupcommit, err = model.NewSqlQueue(db, 10000, 50 * time.Millisecond) if err != nil { debug(err) return } groupcommit.SetExecConf("FixData", "") go DoConnection(listenport) go ReadData(sourdatach) go BroadcastData() //select{} }