123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352 |
- package fix
- import "fmt"
- import "net"
- import "encoding/binary"
- import "os"
- //import "syscall"
- //import "unsafe"
- import "time"
- //import "errors"
- import "sync"
- import _ "github.com/go-sql-driver/mysql"
- import "database/sql"
- import "database/model"
- const listenport = ":8898"
- const NUM_PER_SYMBOL = 10//0000
- const (
- EURUSD = iota
- GBPUSD
- USDJPY
- USDCHF
- AUDUSD
- USDCAD
- NZDUSD
- EURGBP
- EURJPY
- EURCHF
- EURAUD
- EURCAD
- GBPCHF
- GBPJPY
- CHFJPY
- CADJPY
- AUDJPY
- AUDCAD
- USDMXN
- AUDNZD
- XAGUSD
- XAUUSD
- OILUSD
- )
- type Tick struct {
- AskCount int32
- BidCount int32
- AskPrice []float64
- BidPrice []float64
- AskVolume []float64
- BidVolume []float64
- Symbol [8]byte
- Time int32
- Millisecond int32
- }
- type SymbolInfo struct {
- BufIndex int
- BufPos int
- }
- type ClientInfo struct{
- BufIndex int
- DataPos int
- DataNum int16
- }
- type ClientReqInfo struct{
- BeginTime int64
- Symbol int16
- DataNum int16
- }
- type TimeAndPos struct {
- Time int64
- Pos int
- }
- var broaddatach chan TimeAndPos
- var fileLog *os.File
- var fileIndex int64
- var connmapreal map[net.Conn] chan TimeAndPos
- var symbolinfomap map[string] SymbolInfo
- var symbolidmap map[int16] string
- var TickBuf [][NUM_PER_SYMBOL]TickFull
- var mutex sync.Mutex
- var db *sql.DB
- var groupcommit *model.SqlQueue
- func debug(a ...interface{}) (n int, err error) {
- mutex.Lock()
- fileinfo,_ := fileLog.Stat()
- if fileinfo.Size() >= 10*1024*1024 {
- fileLog.Close()
- fileIndex++
- filename := fmt.Sprintf("log%d.txt",fileIndex)
- fileLog, err = os.OpenFile(filename, os.O_RDWR | os.O_CREATE | os.O_TRUNC, 0644)
- if err != nil {
- return -1,err
- }
- }
- mutex.Unlock()
- tmptime := time.Now()
- timestr := fmt.Sprintf("%02d-%02d %02d:%02d:%02d:%d",tmptime.Month(),tmptime.Day(),tmptime.Hour(),tmptime.Minute(),tmptime.Second(),tmptime.Nanosecond())
- return fmt.Fprintln(fileLog,timestr,a)
- }
- func FindPosition(databuf []TickFull,beginpos int,endpos int,time int64)(pos int,re int){
- begintime := GetTotalTime(databuf[beginpos].Time,databuf[beginpos].Millisecond)
- if time <= begintime {
- debug("time:",time,"databuf[beginpos].Time:",begintime)
- return beginpos,0
- }
- endtime := GetTotalTime(databuf[endpos].Time,databuf[endpos].Millisecond)
- if time >= endtime {
- debug("time:",time,"databuf[endpos].Time:",endtime)
- return endpos,0
- }
- midpos := beginpos + (endpos - beginpos)/2
- midtime := GetTotalTime(databuf[midpos].Time,databuf[midpos].Millisecond)
- if time == midtime {
- debug("midpos",midpos)
- return midpos,0
- }else{
- if endpos == (beginpos + 1) {
- debug("endpos:",endpos)
- return endpos,0
- }else {
- if time < midtime {
- debug("beginpos:",beginpos,"midpos:",midpos,"time:",time)
- return FindPosition(databuf,beginpos,midpos,time)
- }else{
- debug("midpos:",midpos,"endpos:",endpos,"time:",time)
- return FindPosition(databuf,midpos,endpos,time)
- }
- }
- }
- debug(-1,-1)
- return -1,-1
- }
- func ReadClientRequest(conn net.Conn){
- var cri ClientReqInfo
- err := binary.Read(conn, binary.LittleEndian, &cri)
- if err == nil {
- var ok bool
- var tmpsymbolinfo SymbolInfo
- var tmpsymbolstr string
- if cri.Symbol != -1 {
- tmpsymbolstr,ok = symbolidmap[cri.Symbol]
- if ok {
- tmpsymbolinfo,ok = symbolinfomap[tmpsymbolstr]
- }
- }
- if (cri.Symbol == -1) || (ok) {
- debug("begintime:",cri.BeginTime,"symbol:",cri.Symbol)
- var ci ClientInfo
- ci.BufIndex = tmpsymbolinfo.BufIndex
- ci.DataNum = cri.DataNum
- if cri.BeginTime == -1 || cri.BeginTime == 0 {
- if cri.BeginTime == -1 {
- ci.DataPos = tmpsymbolinfo.BufPos
- }else{
- ci.DataPos = 0
- }
- debug("ci.BufIndex:",ci.BufIndex,"ci.DataPos:",ci.DataPos)
- }else{
- pos,re := FindPosition(TickBuf[tmpsymbolinfo.BufIndex][:],0,tmpsymbolinfo.BufPos,cri.BeginTime)
- if re >= 0 {
- ci.DataPos = pos
- debug("ci.BufIndex:",ci.BufIndex,"ci.DataPos:",ci.DataPos)
- }else{
- ci.DataPos = 0
- debug("ci.BufIndex:",ci.BufIndex,"ci.DataPos:",ci.DataPos)
- }
- }
- clientch := make(chan TimeAndPos)
- connmapreal[conn] = clientch
- go SendData(conn,ci,clientch,cri.Symbol,tmpsymbolstr)
- var tmptimeandpos TimeAndPos
- tmptimeandpos.Time = GetTotalTime(TickBuf[tmpsymbolinfo.BufIndex][tmpsymbolinfo.BufPos].Time,
- TickBuf[tmpsymbolinfo.BufIndex][tmpsymbolinfo.BufPos].Millisecond)
- tmptimeandpos.Pos = tmpsymbolinfo.BufPos
- clientch <- tmptimeandpos
- }else{
- var retcode int32 = -1
- err := binary.Write(conn, binary.LittleEndian, &retcode)
- if err != nil {
- debug(err)
- }
- conn.Close()
- debug("symbol:",cri.Symbol)
- }
- }else{
- conn.Close()
- debug(err)
- }
- }
- func DoConnection(port string) {
- ln, err := net.Listen("tcp", port)
- if err != nil {
- panic(err)
- }
- for {
- conn, err := ln.Accept()
- if err != nil {
- fmt.Println(err)
- continue
- }
- debug("get client conn")
- debug("link begin",conn.RemoteAddr().Network(),conn.RemoteAddr().String())
- go ReadClientRequest(conn)
- }
- }
- func GetTotalTime(Time int32,Millisecond int32)(TotalTime int64){
- TotalTime = int64(Time)
- TotalTime <<= 32
- TotalTime += int64(Millisecond)
- return
- }
- func ReadData(sourdatach chan *TickFull){
- for {
- tf := <-sourdatach
- tmpsymbolstr := string(tf.Symbol[:])
- tmpsymbolinfo,ok := symbolinfomap[tmpsymbolstr]
- if !ok {
- tmpsymbolinfo.BufIndex = len(TickBuf)
- tmpsymbolinfo.BufPos = 0
- symbolinfomap[tmpsymbolstr] = tmpsymbolinfo
- tmpsymbolid := int16(tmpsymbolinfo.BufIndex)
- symbolidmap[tmpsymbolid] = tmpsymbolstr
- var tmpbuf [NUM_PER_SYMBOL]TickFull
- TickBuf = append(TickBuf,tmpbuf)
- //debug(tmpsymbolstr,tmpsymbolid,tmpsymbolinfo)
- }else{
- tmpsymbolinfo.BufPos++
- if tmpsymbolinfo.BufPos >= NUM_PER_SYMBOL {
- copy(TickBuf[tmpsymbolinfo.BufIndex][0:NUM_PER_SYMBOL/2],TickBuf[tmpsymbolinfo.BufIndex][NUM_PER_SYMBOL/2:])
- tmpsymbolinfo.BufPos = NUM_PER_SYMBOL/2
- }
- symbolinfomap[tmpsymbolstr] = tmpsymbolinfo
- //debug(tmpsymbolstr,tmpsymbolinfo)
- }
- TickBuf[tmpsymbolinfo.BufIndex][tmpsymbolinfo.BufPos] = *tf
- //fmt.Println(TickBuf[CurBufIndex][CurDataPos].Tag,TickBuf[CurBufIndex][CurDataPos].Period,TickBuf[CurBufIndex][CurDataPos].Time,TickBuf[CurBufIndex][CurDataPos].MSec,TickBuf[CurBufIndex][CurDataPos].Symbol,TickBuf[CurBufIndex][CurDataPos].Bid,TickBuf[CurBufIndex][CurDataPos].Ask,TickBuf[CurBufIndex][CurDataPos].High,TickBuf[CurBufIndex][CurDataPos].Low,TickBuf[CurBufIndex][CurDataPos].Volume,"\n")
- //debug(tmpsymbolinfo.BufIndex,tmpsymbolinfo.BufPos - 1,TickBuf[tmpsymbolinfo.BufIndex][tmpsymbolinfo.BufPos - 1].Time,TickBuf[tmpsymbolinfo.BufIndex][tmpsymbolinfo.BufPos - 1].Millisecond)
- var curinfo TimeAndPos
- curinfo.Time = GetTotalTime(tf.Time,tf.Millisecond)
- curinfo.Pos = tmpsymbolinfo.BufPos
- //debug(curinfo)
- if tmpsymbolstr[:4] == "4001" {
- debug(tmpsymbolinfo.BufPos)
- debug(TickBuf[0][0].Time,TickBuf[0][1].Time,TickBuf[0][2].Time,TickBuf[0][3].Time,TickBuf[0][4].Time,TickBuf[0][5].Time,
- TickBuf[0][6].Time,TickBuf[0][7].Time,TickBuf[0][8].Time,TickBuf[0][9].Time)
- }
- errdb := groupcommit.ExecStruct(tf, func (elm *model.SqlElement) {})
- if errdb != nil {
- debug("db error",errdb)
- }
- broaddatach <- curinfo
- }
- }
- func GetSendData(tf TickFull,num int16)(ti Tick){
- if tf.AskCount > int32(num) {
- ti.AskCount = int32(num)
- }
- if tf.BidCount > int32(num) {
- ti.BidCount = int32(num)
- }
- ti.AskPrice = make([]float64,ti.AskCount)
- ti.AskPrice = tf.AskPrice[0:ti.AskCount]
- ti.BidPrice = make([]float64,ti.BidCount)
- ti.BidPrice = tf.BidPrice[0:ti.BidCount]
- ti.AskVolume = make([]float64,ti.AskCount)
- ti.AskVolume = tf.AskVolume[0:ti.AskCount]
- ti.BidVolume = make([]float64,ti.BidCount)
- ti.BidVolume = tf.BidVolume[0:ti.BidCount]
- copy(ti.Symbol[:],tf.Symbol[:])
- ti.Time = tf.Time
- ti.Millisecond = tf.Millisecond
- return
- }
- func SendData(conn net.Conn,ci ClientInfo,ch chan TimeAndPos,symbol int16,symbolstr string){
- for{
- CurInfo := <-ch
- clienttime := GetTotalTime(TickBuf[ci.BufIndex][ci.DataPos].Time,TickBuf[ci.BufIndex][ci.DataPos].Millisecond)
- for ;clienttime <= CurInfo.Time; {
- if symbol == -1 || symbolstr == string(TickBuf[ci.BufIndex][ci.DataPos].Symbol[:]) {
- tmptick := GetSendData(TickBuf[ci.BufIndex][ci.DataPos],ci.DataNum)
- err := binary.Write(conn, binary.LittleEndian, &tmptick)
- if err != nil {
- debug("link end",err,conn.RemoteAddr().Network(),conn.RemoteAddr().String())
- delete(connmapreal,conn)
- conn.Close()
- return
- }
- }
- needbreak := false
- if clienttime == CurInfo.Time {
- needbreak = true
- }
- ci.DataPos = ci.DataPos + 1
- if ci.DataPos > CurInfo.Pos {
- ci.DataPos = 0
- }
- if needbreak {
- break
- }
- }
- }
- }
- func BroadcastData(){
- for {
- curtime := <-broaddatach
- for _,clientch := range connmapreal {
- select {
- case clientch <- curtime:
- default:
- }
- }
- }
- }
- func InitBroadcast(sourdatach chan *TickFull) {
- connmapreal = make(map[net.Conn] chan TimeAndPos)
- symbolinfomap = make(map[string] SymbolInfo)
- symbolidmap = make(map[int16] string)
- broaddatach = make(chan TimeAndPos)
- filename := fmt.Sprintf("log%d.txt",fileIndex)
- fileLog, _ = os.OpenFile(filename, os.O_RDWR | os.O_CREATE | os.O_TRUNC, 0644)
- var err error
- db,err = sql.Open("mysql","wr_mtuser:jDfSL6e9ZSCQDySP@tcp(211.155.230.186:3306)/wr_mtuser?charset=utf8")
- if err != nil {
- debug(err)
- return
- }
- groupcommit, err = model.NewSqlQueue(db, 10000, 50 * time.Millisecond)
- if err != nil {
- debug(err)
- return
- }
- groupcommit.SetExecConf("FixData", "")
- go DoConnection(listenport)
- go ReadData(sourdatach)
- go BroadcastData()
- //select{}
- }
|