broadcast.go 9.8 KB


  1. package fix
  2. import "fmt"
  3. import "net"
  4. import "encoding/binary"
  5. import "os"
  6. //import "syscall"
  7. //import "unsafe"
  8. import "time"
  9. //import "errors"
  10. import "sync"
  11. import _ "github.com/go-sql-driver/mysql"
  12. import "database/sql"
  13. import "database/model"
  14. const listenport = ":8898"
  15. const NUM_PER_SYMBOL = 10//0000
  16. const (
  17. EURUSD = iota
  18. GBPUSD
  19. USDJPY
  20. USDCHF
  21. AUDUSD
  22. USDCAD
  23. NZDUSD
  24. EURGBP
  25. EURJPY
  26. EURCHF
  27. EURAUD
  28. EURCAD
  29. GBPCHF
  30. GBPJPY
  31. CHFJPY
  32. CADJPY
  33. AUDJPY
  34. AUDCAD
  35. USDMXN
  36. AUDNZD
  37. XAGUSD
  38. XAUUSD
  39. OILUSD
  40. )
  41. type Tick struct {
  42. AskCount int32
  43. BidCount int32
  44. AskPrice []float64
  45. BidPrice []float64
  46. AskVolume []float64
  47. BidVolume []float64
  48. Symbol [8]byte
  49. Time int32
  50. Millisecond int32
  51. }
  52. type SymbolInfo struct {
  53. BufIndex int
  54. BufPos int
  55. }
  56. type ClientInfo struct{
  57. BufIndex int
  58. DataPos int
  59. DataNum int16
  60. }
  61. type ClientReqInfo struct{
  62. BeginTime int64
  63. Symbol int16
  64. DataNum int16
  65. }
  66. type TimeAndPos struct {
  67. Time int64
  68. Pos int
  69. }
  70. var broaddatach chan TimeAndPos
  71. var fileLog *os.File
  72. var fileIndex int64
  73. var connmapreal map[net.Conn] chan TimeAndPos
  74. var symbolinfomap map[string] SymbolInfo
  75. var symbolidmap map[int16] string
  76. var TickBuf [][NUM_PER_SYMBOL]TickFull
  77. var mutex sync.Mutex
  78. var db *sql.DB
  79. var groupcommit *model.SqlQueue
  80. func debug(a ...interface{}) (n int, err error) {
  81. mutex.Lock()
  82. fileinfo,_ := fileLog.Stat()
  83. if fileinfo.Size() >= 10*1024*1024 {
  84. fileLog.Close()
  85. fileIndex++
  86. filename := fmt.Sprintf("log%d.txt",fileIndex)
  87. fileLog, err = os.OpenFile(filename, os.O_RDWR | os.O_CREATE | os.O_TRUNC, 0644)
  88. if err != nil {
  89. return -1,err
  90. }
  91. }
  92. mutex.Unlock()
  93. tmptime := time.Now()
  94. timestr := fmt.Sprintf("%02d-%02d %02d:%02d:%02d:%d",tmptime.Month(),tmptime.Day(),tmptime.Hour(),tmptime.Minute(),tmptime.Second(),tmptime.Nanosecond())
  95. return fmt.Fprintln(fileLog,timestr,a)
  96. }
  97. func FindPosition(databuf []TickFull,beginpos int,endpos int,time int64)(pos int,re int){
  98. begintime := GetTotalTime(databuf[beginpos].Time,databuf[beginpos].Millisecond)
  99. if time <= begintime {
  100. debug("time:",time,"databuf[beginpos].Time:",begintime)
  101. return beginpos,0
  102. }
  103. endtime := GetTotalTime(databuf[endpos].Time,databuf[endpos].Millisecond)
  104. if time >= endtime {
  105. debug("time:",time,"databuf[endpos].Time:",endtime)
  106. return endpos,0
  107. }
  108. midpos := beginpos + (endpos - beginpos)/2
  109. midtime := GetTotalTime(databuf[midpos].Time,databuf[midpos].Millisecond)
  110. if time == midtime {
  111. debug("midpos",midpos)
  112. return midpos,0
  113. }else{
  114. if endpos == (beginpos + 1) {
  115. debug("endpos:",endpos)
  116. return endpos,0
  117. }else {
  118. if time < midtime {
  119. debug("beginpos:",beginpos,"midpos:",midpos,"time:",time)
  120. return FindPosition(databuf,beginpos,midpos,time)
  121. }else{
  122. debug("midpos:",midpos,"endpos:",endpos,"time:",time)
  123. return FindPosition(databuf,midpos,endpos,time)
  124. }
  125. }
  126. }
  127. debug(-1,-1)
  128. return -1,-1
  129. }
  130. func ReadClientRequest(conn net.Conn){
  131. var cri ClientReqInfo
  132. err := binary.Read(conn, binary.LittleEndian, &cri)
  133. if err == nil {
  134. var ok bool
  135. var tmpsymbolinfo SymbolInfo
  136. var tmpsymbolstr string
  137. if cri.Symbol != -1 {
  138. tmpsymbolstr,ok = symbolidmap[cri.Symbol]
  139. if ok {
  140. tmpsymbolinfo,ok = symbolinfomap[tmpsymbolstr]
  141. }
  142. }
  143. if (cri.Symbol == -1) || (ok) {
  144. debug("begintime:",cri.BeginTime,"symbol:",cri.Symbol)
  145. var ci ClientInfo
  146. ci.BufIndex = tmpsymbolinfo.BufIndex
  147. ci.DataNum = cri.DataNum
  148. if cri.BeginTime == -1 || cri.BeginTime == 0 {
  149. if cri.BeginTime == -1 {
  150. ci.DataPos = tmpsymbolinfo.BufPos
  151. }else{
  152. ci.DataPos = 0
  153. }
  154. debug("ci.BufIndex:",ci.BufIndex,"ci.DataPos:",ci.DataPos)
  155. }else{
  156. pos,re := FindPosition(TickBuf[tmpsymbolinfo.BufIndex][:],0,tmpsymbolinfo.BufPos,cri.BeginTime)
  157. if re >= 0 {
  158. ci.DataPos = pos
  159. debug("ci.BufIndex:",ci.BufIndex,"ci.DataPos:",ci.DataPos)
  160. }else{
  161. ci.DataPos = 0
  162. debug("ci.BufIndex:",ci.BufIndex,"ci.DataPos:",ci.DataPos)
  163. }
  164. }
  165. clientch := make(chan TimeAndPos)
  166. connmapreal[conn] = clientch
  167. go SendData(conn,ci,clientch,cri.Symbol,tmpsymbolstr)
  168. var tmptimeandpos TimeAndPos
  169. tmptimeandpos.Time = GetTotalTime(TickBuf[tmpsymbolinfo.BufIndex][tmpsymbolinfo.BufPos].Time,
  170. TickBuf[tmpsymbolinfo.BufIndex][tmpsymbolinfo.BufPos].Millisecond)
  171. tmptimeandpos.Pos = tmpsymbolinfo.BufPos
  172. clientch <- tmptimeandpos
  173. }else{
  174. var retcode int32 = -1
  175. err := binary.Write(conn, binary.LittleEndian, &retcode)
  176. if err != nil {
  177. debug(err)
  178. }
  179. conn.Close()
  180. debug("symbol:",cri.Symbol)
  181. }
  182. }else{
  183. conn.Close()
  184. debug(err)
  185. }
  186. }
  187. func DoConnection(port string) {
  188. ln, err := net.Listen("tcp", port)
  189. if err != nil {
  190. panic(err)
  191. }
  192. for {
  193. conn, err := ln.Accept()
  194. if err != nil {
  195. fmt.Println(err)
  196. continue
  197. }
  198. debug("get client conn")
  199. debug("link begin",conn.RemoteAddr().Network(),conn.RemoteAddr().String())
  200. go ReadClientRequest(conn)
  201. }
  202. }
  203. func GetTotalTime(Time int32,Millisecond int32)(TotalTime int64){
  204. TotalTime = int64(Time)
  205. TotalTime <<= 32
  206. TotalTime += int64(Millisecond)
  207. return
  208. }
  209. func ReadData(sourdatach chan *TickFull){
  210. for {
  211. tf := <-sourdatach
  212. tmpsymbolstr := string(tf.Symbol[:])
  213. tmpsymbolinfo,ok := symbolinfomap[tmpsymbolstr]
  214. if !ok {
  215. tmpsymbolinfo.BufIndex = len(TickBuf)
  216. tmpsymbolinfo.BufPos = 0
  217. symbolinfomap[tmpsymbolstr] = tmpsymbolinfo
  218. tmpsymbolid := int16(tmpsymbolinfo.BufIndex)
  219. symbolidmap[tmpsymbolid] = tmpsymbolstr
  220. var tmpbuf [NUM_PER_SYMBOL]TickFull
  221. TickBuf = append(TickBuf,tmpbuf)
  222. //debug(tmpsymbolstr,tmpsymbolid,tmpsymbolinfo)
  223. }else{
  224. tmpsymbolinfo.BufPos++
  225. if tmpsymbolinfo.BufPos >= NUM_PER_SYMBOL {
  226. copy(TickBuf[tmpsymbolinfo.BufIndex][0:NUM_PER_SYMBOL/2],TickBuf[tmpsymbolinfo.BufIndex][NUM_PER_SYMBOL/2:])
  227. tmpsymbolinfo.BufPos = NUM_PER_SYMBOL/2
  228. }
  229. symbolinfomap[tmpsymbolstr] = tmpsymbolinfo
  230. //debug(tmpsymbolstr,tmpsymbolinfo)
  231. }
  232. TickBuf[tmpsymbolinfo.BufIndex][tmpsymbolinfo.BufPos] = *tf
  233. //fmt.Println(TickBuf[CurBufIndex][CurDataPos].Tag,TickBuf[CurBufIndex][CurDataPos].Period,TickBuf[CurBufIndex][CurDataPos].Time,TickBuf[CurBufIndex][CurDataPos].MSec,TickBuf[CurBufIndex][CurDataPos].Symbol,TickBuf[CurBufIndex][CurDataPos].Bid,TickBuf[CurBufIndex][CurDataPos].Ask,TickBuf[CurBufIndex][CurDataPos].High,TickBuf[CurBufIndex][CurDataPos].Low,TickBuf[CurBufIndex][CurDataPos].Volume,"\n")
  234. //debug(tmpsymbolinfo.BufIndex,tmpsymbolinfo.BufPos - 1,TickBuf[tmpsymbolinfo.BufIndex][tmpsymbolinfo.BufPos - 1].Time,TickBuf[tmpsymbolinfo.BufIndex][tmpsymbolinfo.BufPos - 1].Millisecond)
  235. var curinfo TimeAndPos
  236. curinfo.Time = GetTotalTime(tf.Time,tf.Millisecond)
  237. curinfo.Pos = tmpsymbolinfo.BufPos
  238. //debug(curinfo)
  239. if tmpsymbolstr[:4] == "4001" {
  240. debug(tmpsymbolinfo.BufPos)
  241. debug(TickBuf[0][0].Time,TickBuf[0][1].Time,TickBuf[0][2].Time,TickBuf[0][3].Time,TickBuf[0][4].Time,TickBuf[0][5].Time,
  242. TickBuf[0][6].Time,TickBuf[0][7].Time,TickBuf[0][8].Time,TickBuf[0][9].Time)
  243. }
  244. errdb := groupcommit.ExecStruct(tf, func (elm *model.SqlElement) {})
  245. if errdb != nil {
  246. debug("db error",errdb)
  247. }
  248. broaddatach <- curinfo
  249. }
  250. }
  251. func GetSendData(tf TickFull,num int16)(ti Tick){
  252. if tf.AskCount > int32(num) {
  253. ti.AskCount = int32(num)
  254. }
  255. if tf.BidCount > int32(num) {
  256. ti.BidCount = int32(num)
  257. }
  258. ti.AskPrice = make([]float64,ti.AskCount)
  259. ti.AskPrice = tf.AskPrice[0:ti.AskCount]
  260. ti.BidPrice = make([]float64,ti.BidCount)
  261. ti.BidPrice = tf.BidPrice[0:ti.BidCount]
  262. ti.AskVolume = make([]float64,ti.AskCount)
  263. ti.AskVolume = tf.AskVolume[0:ti.AskCount]
  264. ti.BidVolume = make([]float64,ti.BidCount)
  265. ti.BidVolume = tf.BidVolume[0:ti.BidCount]
  266. copy(ti.Symbol[:],tf.Symbol[:])
  267. ti.Time = tf.Time
  268. ti.Millisecond = tf.Millisecond
  269. return
  270. }
  271. func SendData(conn net.Conn,ci ClientInfo,ch chan TimeAndPos,symbol int16,symbolstr string){
  272. for{
  273. CurInfo := <-ch
  274. clienttime := GetTotalTime(TickBuf[ci.BufIndex][ci.DataPos].Time,TickBuf[ci.BufIndex][ci.DataPos].Millisecond)
  275. for ;clienttime <= CurInfo.Time; {
  276. if symbol == -1 || symbolstr == string(TickBuf[ci.BufIndex][ci.DataPos].Symbol[:]) {
  277. tmptick := GetSendData(TickBuf[ci.BufIndex][ci.DataPos],ci.DataNum)
  278. err := binary.Write(conn, binary.LittleEndian, &tmptick)
  279. if err != nil {
  280. debug("link end",err,conn.RemoteAddr().Network(),conn.RemoteAddr().String())
  281. delete(connmapreal,conn)
  282. conn.Close()
  283. return
  284. }
  285. }
  286. needbreak := false
  287. if clienttime == CurInfo.Time {
  288. needbreak = true
  289. }
  290. ci.DataPos = ci.DataPos + 1
  291. if ci.DataPos > CurInfo.Pos {
  292. ci.DataPos = 0
  293. }
  294. if needbreak {
  295. break
  296. }
  297. }
  298. }
  299. }
  300. func BroadcastData(){
  301. for {
  302. curtime := <-broaddatach
  303. for _,clientch := range connmapreal {
  304. select {
  305. case clientch <- curtime:
  306. default:
  307. }
  308. }
  309. }
  310. }
  311. func InitBroadcast(sourdatach chan *TickFull) {
  312. connmapreal = make(map[net.Conn] chan TimeAndPos)
  313. symbolinfomap = make(map[string] SymbolInfo)
  314. symbolidmap = make(map[int16] string)
  315. broaddatach = make(chan TimeAndPos)
  316. filename := fmt.Sprintf("log%d.txt",fileIndex)
  317. fileLog, _ = os.OpenFile(filename, os.O_RDWR | os.O_CREATE | os.O_TRUNC, 0644)
  318. var err error
  319. db,err = sql.Open("mysql","wr_mtuser:jDfSL6e9ZSCQDySP@tcp(211.155.230.186:3306)/wr_mtuser?charset=utf8")
  320. if err != nil {
  321. debug(err)
  322. return
  323. }
  324. groupcommit, err = model.NewSqlQueue(db, 10000, 50 * time.Millisecond)
  325. if err != nil {
  326. debug(err)
  327. return
  328. }
  329. groupcommit.SetExecConf("FixData", "")
  330. go DoConnection(listenport)
  331. go ReadData(sourdatach)
  332. go BroadcastData()
  333. //select{}
  334. }