Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(28)

Delta Between Two Patch Sets: src/pkg/websocket/websocket.go

Issue 5058043: code review 5058043: websocket: add mutex to make websocket full-duplex
Left Patch Set: Created 13 years, 6 months ago
Right Patch Set: diff -r 49696534f8b7 https://go.googlecode.com/hg/ Created 13 years, 6 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
Right: Side by side diff | Download
« no previous file with change/comment | « src/pkg/websocket/hybi.go ('k') | no next file » | no next file with change/comment »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
LEFTRIGHT
(no file at all)
1 // Copyright 2009 The Go Authors. All rights reserved. 1 // Copyright 2009 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style 2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file. 3 // license that can be found in the LICENSE file.
4 4
5 // Package websocket implements a client and server for the WebSocket protocol. 5 // Package websocket implements a client and server for the WebSocket protocol.
6 // The protocol is defined at http://tools.ietf.org/html/draft-ietf-hybi-thewebs ocketprotocol 6 // The protocol is defined at http://tools.ietf.org/html/draft-ietf-hybi-thewebs ocketprotocol
7 package websocket 7 package websocket
8 8
9 import ( 9 import (
10 "bufio" 10 "bufio"
11 "crypto/tls" 11 "crypto/tls"
12 "http" 12 "http"
13 "io" 13 "io"
14 "io/ioutil" 14 "io/ioutil"
15 "json" 15 "json"
16 "net" 16 "net"
17 "os" 17 "os"
18 "sync"
18 "url" 19 "url"
19 ) 20 )
20 21
21 const ( 22 const (
22 ProtocolVersionHixie75 = -75 23 ProtocolVersionHixie75 = -75
23 ProtocolVersionHixie76 = -76 24 ProtocolVersionHixie76 = -76
24 ProtocolVersionHybi00 = 0 25 ProtocolVersionHybi00 = 0
25 ProtocolVersionHybi = 8 26 ProtocolVersionHybi = 8
26 27
27 ContinuationFrame = 0 28 ContinuationFrame = 0
(...skipping 112 matching lines...) Expand 10 before | Expand all | Expand 10 after
140 } 141 }
141 142
142 // Conn represents a WebSocket connection. 143 // Conn represents a WebSocket connection.
143 type Conn struct { 144 type Conn struct {
144 config *Config 145 config *Config
145 request *http.Request 146 request *http.Request
146 147
147 buf *bufio.ReadWriter 148 buf *bufio.ReadWriter
148 rwc io.ReadWriteCloser 149 rwc io.ReadWriteCloser
149 150
151 rio sync.Mutex
150 frameReaderFactory 152 frameReaderFactory
151 frameReader 153 frameReader
152 154
155 wio sync.Mutex
153 frameWriterFactory 156 frameWriterFactory
154 157
155 frameHandler 158 frameHandler
156 PayloadType byte 159 PayloadType byte
157 defaultCloseStatus int 160 defaultCloseStatus int
158 } 161 }
159 162
160 // Read implements the io.Reader interface: 163 // Read implements the io.Reader interface:
161 // it reads data of a frame from the WebSocket connection. 164 // it reads data of a frame from the WebSocket connection.
162 // if msg is not large enough for the frame data, it fills the msg and next Read 165 // if msg is not large enough for the frame data, it fills the msg and next Read
163 // will read the rest of the frame data. 166 // will read the rest of the frame data.
164 // it reads Text frame or Binary frame. 167 // it reads Text frame or Binary frame.
165 func (ws *Conn) Read(msg []byte) (n int, err os.Error) { 168 func (ws *Conn) Read(msg []byte) (n int, err os.Error) {
169 ws.rio.Lock()
170 defer ws.rio.Unlock()
166 again: 171 again:
167 if ws.frameReader == nil { 172 if ws.frameReader == nil {
168 frame, err := ws.frameReaderFactory.NewFrameReader() 173 frame, err := ws.frameReaderFactory.NewFrameReader()
169 if err != nil { 174 if err != nil {
170 return 0, err 175 return 0, err
171 } 176 }
172 ws.frameReader, err = ws.frameHandler.HandleFrame(frame) 177 ws.frameReader, err = ws.frameHandler.HandleFrame(frame)
173 if err != nil { 178 if err != nil {
174 return 0, err 179 return 0, err
175 } 180 }
176 if ws.frameReader == nil { 181 if ws.frameReader == nil {
177 goto again 182 goto again
178 } 183 }
179 } 184 }
180 n, err = ws.frameReader.Read(msg) 185 n, err = ws.frameReader.Read(msg)
181 if err == os.EOF { 186 if err == os.EOF {
182 if trailer := ws.frameReader.TrailerReader(); trailer != nil { 187 if trailer := ws.frameReader.TrailerReader(); trailer != nil {
183 io.Copy(ioutil.Discard, trailer) 188 io.Copy(ioutil.Discard, trailer)
184 } 189 }
185 ws.frameReader = nil 190 ws.frameReader = nil
186 goto again 191 goto again
187 } 192 }
188 return n, err 193 return n, err
189 } 194 }
190 195
191 // Write implements the io.Writer interface: 196 // Write implements the io.Writer interface:
192 // it writes data as a frame to the WebSocket connection. 197 // it writes data as a frame to the WebSocket connection.
193 func (ws *Conn) Write(msg []byte) (n int, err os.Error) { 198 func (ws *Conn) Write(msg []byte) (n int, err os.Error) {
199 ws.wio.Lock()
200 defer ws.wio.Unlock()
194 w, err := ws.frameWriterFactory.NewFrameWriter(ws.PayloadType) 201 w, err := ws.frameWriterFactory.NewFrameWriter(ws.PayloadType)
195 if err != nil { 202 if err != nil {
196 return 0, err 203 return 0, err
197 } 204 }
198 n, err = w.Write(msg) 205 n, err = w.Write(msg)
199 w.Close() 206 w.Close()
200 if err != nil { 207 if err != nil {
201 return n, err 208 return n, err
202 } 209 }
203 return n, err 210 return n, err
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after
272 279
273 // Send sends v marshaled by cd.Marshal as single frame to ws. 280 // Send sends v marshaled by cd.Marshal as single frame to ws.
274 func (cd Codec) Send(ws *Conn, v interface{}) (err os.Error) { 281 func (cd Codec) Send(ws *Conn, v interface{}) (err os.Error) {
275 if err != nil { 282 if err != nil {
276 return err 283 return err
277 } 284 }
278 data, payloadType, err := cd.Marshal(v) 285 data, payloadType, err := cd.Marshal(v)
279 if err != nil { 286 if err != nil {
280 return err 287 return err
281 } 288 }
289 ws.wio.Lock()
290 defer ws.wio.Unlock()
282 w, err := ws.frameWriterFactory.NewFrameWriter(payloadType) 291 w, err := ws.frameWriterFactory.NewFrameWriter(payloadType)
283 _, err = w.Write(data) 292 _, err = w.Write(data)
284 w.Close() 293 w.Close()
285 return err 294 return err
286 } 295 }
287 296
288 // Receive receives single frame from ws, unmarshaled by cd.Unmarshal and stores in v. 297 // Receive receives single frame from ws, unmarshaled by cd.Unmarshal and stores in v.
289 func (cd Codec) Receive(ws *Conn, v interface{}) (err os.Error) { 298 func (cd Codec) Receive(ws *Conn, v interface{}) (err os.Error) {
299 ws.rio.Lock()
300 defer ws.rio.Unlock()
290 if ws.frameReader != nil { 301 if ws.frameReader != nil {
291 _, err = io.Copy(ioutil.Discard, ws.frameReader) 302 _, err = io.Copy(ioutil.Discard, ws.frameReader)
292 if err != nil { 303 if err != nil {
293 return err 304 return err
294 } 305 }
295 ws.frameReader = nil 306 ws.frameReader = nil
296 } 307 }
297 again: 308 again:
298 frame, err := ws.frameReaderFactory.NewFrameReader() 309 frame, err := ws.frameReaderFactory.NewFrameReader()
299 if err != nil { 310 if err != nil {
(...skipping 86 matching lines...) Expand 10 before | Expand all | Expand 10 after
386 } 397 }
387 398
388 // receive JSON type T 399 // receive JSON type T
389 var data T 400 var data T
390 websocket.JSON.Receive(ws, &data) 401 websocket.JSON.Receive(ws, &data)
391 402
392 // send JSON type T 403 // send JSON type T
393 websocket.JSON.Send(ws, data) 404 websocket.JSON.Send(ws, data)
394 */ 405 */
395 var JSON = Codec{jsonMarshal, jsonUnmarshal} 406 var JSON = Codec{jsonMarshal, jsonUnmarshal}
LEFTRIGHT

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b