LEFT | RIGHT |
1 // Copyright 2011 The Go Authors. All rights reserved. | 1 // Copyright 2011 The Go Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style | 2 // Use of this source code is governed by a BSD-style |
3 // license that can be found in the LICENSE file. | 3 // license that can be found in the LICENSE file. |
4 | 4 |
5 package ssh | 5 package ssh |
6 | 6 |
7 import ( | 7 import ( |
| 8 "encoding/binary" |
8 "errors" | 9 "errors" |
9 "fmt" | 10 "fmt" |
10 "io" | 11 "io" |
| 12 "log" |
11 "sync" | 13 "sync" |
12 "sync/atomic" | |
13 ) | 14 ) |
14 | 15 |
15 // extendedDataTypeCode identifies an OpenSSL extended data type. See RFC 4254, | 16 // A Channel is an ordered, reliable, duplex stream that is |
16 // section 5.2. | 17 // multiplexed over an SSH connection. |
17 type extendedDataTypeCode uint32 | |
18 | |
19 const ( | |
20 » // extendedDataStderr is the extended data type that is used for stderr. | |
21 » extendedDataStderr extendedDataTypeCode = 1 | |
22 | |
23 » // minPacketLength defines the smallest valid packet | |
24 » minPacketLength = 9 | |
25 ) | |
26 | |
27 // A Channel is an ordered, reliable, duplex stream that is multiplexed over an | |
28 // SSH connection. Channel.Read can return a ChannelRequest as an error. | |
29 type Channel interface { | 18 type Channel interface { |
30 // Accept accepts the channel creation request. | 19 // Accept accepts the channel creation request. |
31 Accept() error | 20 Accept() error |
32 | 21 |
33 » // Reject rejects the channel creation request. After calling this, no | 22 » // Reject rejects the channel creation request. After calling |
34 » // other methods on the Channel may be called. If they are then the | 23 » // this, no other methods on the Channel may be called. |
35 » // peer is likely to signal a protocol error and drop the connection. | |
36 Reject(reason RejectionReason, message string) error | 24 Reject(reason RejectionReason, message string) error |
37 | 25 |
| 26 // Read may return a ChannelRequest as an error. |
38 Read(data []byte) (int, error) | 27 Read(data []byte) (int, error) |
39 Write(data []byte) (int, error) | 28 Write(data []byte) (int, error) |
| 29 |
| 30 // Signals end of channel use. No data may be sent after this |
| 31 // call. |
40 Close() error | 32 Close() error |
41 | 33 |
42 » // Stderr returns an io.ReadWriter that writes to this channel with the | 34 » // Stderr returns an io.Writer that writes to this channel with the |
43 // extended data type set to stderr. | 35 // extended data type set to stderr. |
44 » Stderr() io.ReadWriter | 36 » Stderr() io.Writer |
45 | |
46 » // SendRequest sends a channel request. | |
47 » SendRequest(name string, wantReply bool, payload []byte) (bool, error) | |
48 | |
49 » // ReceivedRequests returns the channel for out-of-band | |
50 » // requests. For all requests that have WantReply, a call of | |
51 » // AckRequest should follow. | |
52 » ReceivedRequests() chan *ChannelRequest | |
53 | 37 |
54 // AckRequest either sends an ack or nack to the channel | 38 // AckRequest either sends an ack or nack to the channel |
55 // request. It should only be called if the last | 39 // request. It should only be called if the last |
56 // ChannelRequest had a WantReply | 40 // ChannelRequest had a WantReply |
57 AckRequest(ok bool) error | 41 AckRequest(ok bool) error |
58 | 42 |
59 // ChannelType returns the type of the channel, as supplied by the | 43 // ChannelType returns the type of the channel, as supplied by the |
60 // client. | 44 // client. |
61 ChannelType() string | 45 ChannelType() string |
62 » // ExtraData returns the arbitary payload for this channel, as supplied | 46 |
| 47 » // ExtraData returns the arbitrary payload for this channel, as supplied |
63 // by the client. This data is specific to the channel type. | 48 // by the client. This data is specific to the channel type. |
64 ExtraData() []byte | 49 ExtraData() []byte |
65 } | 50 } |
66 | 51 |
67 // ChannelRequest represents a request sent on a channel, outside of the normal | 52 // ChannelRequest represents a request sent, outside of the normal |
68 // stream of bytes. It may result from calling Read on a Channel. | 53 // stream of bytes. |
69 type ChannelRequest struct { | 54 type ChannelRequest struct { |
70 Request string | 55 Request string |
71 WantReply bool | 56 WantReply bool |
72 Payload []byte | 57 Payload []byte |
73 } | 58 } |
74 | 59 |
75 func (c ChannelRequest) Error() string { | 60 func (c ChannelRequest) Error() string { |
76 return "ssh: channel request received" | 61 return "ssh: channel request received" |
77 } | 62 } |
78 | 63 |
79 // RejectionReason is an enumeration used when rejecting channel creation | 64 // RejectionReason is an enumeration used when rejecting channel creation |
80 // requests. See RFC 4254, section 5.1. | 65 // requests. See RFC 4254, section 5.1. |
81 type RejectionReason uint32 | 66 type RejectionReason uint32 |
82 | 67 |
83 const ( | 68 const ( |
84 Prohibited RejectionReason = iota + 1 | 69 Prohibited RejectionReason = iota + 1 |
85 ConnectionFailed | 70 ConnectionFailed |
86 UnknownChannelType | 71 UnknownChannelType |
87 ResourceShortage | 72 ResourceShortage |
88 ) | 73 ) |
89 | 74 |
| 75 // String converts the rejection reason to human readable form. |
90 func (r RejectionReason) String() string { | 76 func (r RejectionReason) String() string { |
91 switch r { | 77 switch r { |
92 case Prohibited: | 78 case Prohibited: |
93 » » return "Prohibited" | 79 » » return "administratively prohibited" |
94 case ConnectionFailed: | 80 case ConnectionFailed: |
95 » » return "ConnectionFailed" | 81 » » return "connect failed" |
96 case UnknownChannelType: | 82 case UnknownChannelType: |
97 » » return "UnknownChannelType" | 83 » » return "unknown channel type" |
98 case ResourceShortage: | 84 case ResourceShortage: |
99 » » return "ResourceShortage" | 85 » » return "resource shortage" |
100 } | 86 } |
101 return fmt.Sprintf("unknown reason %d", int(r)) | 87 return fmt.Sprintf("unknown reason %d", int(r)) |
102 } | 88 } |
103 | 89 |
| 90 // channel implements the Channel |
104 type channel struct { | 91 type channel struct { |
105 » packetConn packetConn // the underlying transport | 92 » // R/O after creation |
| 93 » chanType string |
| 94 » extraData []byte |
106 localId, remoteId uint32 | 95 localId, remoteId uint32 |
107 remoteWin window | |
108 maxPacket uint32 | 96 maxPacket uint32 |
109 » isClosed uint32 // atomic bool, non zero if true | 97 » mux *mux |
110 } | 98 |
111 | 99 » // If set, we have called Accept or Reject on this channel |
112 func (c *channel) sendWindowAdj(n int) error { | 100 » decided bool |
113 » msg := windowAdjustMsg{ | 101 |
114 » » PeersId: c.remoteId, | 102 » // Pending internal channel messages. |
| 103 » msg chan interface{} |
| 104 |
| 105 » // Pending user-serviceable messages. |
| 106 » sentRequestMu sync.Mutex |
| 107 |
| 108 » incomingRequests chan *ChannelRequest |
| 109 |
| 110 » sentEOF bool |
| 111 |
| 112 » // thread-safe data |
| 113 » remoteWin window |
| 114 » pending *buffer |
| 115 » extPending *buffer |
| 116 |
| 117 » // Protects all of the below. |
| 118 » mu sync.Mutex |
| 119 » myWindow uint32 |
| 120 » sentClose bool |
| 121 } |
| 122 |
| 123 func (c *channel) getWindowSpace(max uint32) (uint32, error) { |
| 124 » // check if closed? |
| 125 » return c.remoteWin.reserve(max), nil |
| 126 } |
| 127 |
| 128 // writePacket sends the packet over the wire. If the packet is a |
| 129 // channel close, it updates sentClose. This method takes the lock |
| 130 // c.mu. |
| 131 func (c *channel) writePacket(packet []byte) error { |
| 132 » if uint32(len(packet)) > c.maxPacket { |
| 133 » » return fmt.Errorf("ssh: cannot write %d bytes, maxPacket is %d b
ytes", len(packet), c.maxPacket) |
| 134 » } |
| 135 |
| 136 » c.mu.Lock() |
| 137 » if c.sentClose { |
| 138 » » c.mu.Unlock() |
| 139 » » return io.EOF |
| 140 » } |
| 141 » c.sentClose = (packet[0] == msgChannelClose) |
| 142 » err := c.mux.conn.writePacket(packet) |
| 143 » c.mu.Unlock() |
| 144 » return err |
| 145 } |
| 146 |
| 147 func (c *channel) sendMessage(code byte, msg interface{}) error { |
| 148 » if debug { |
| 149 » » log.Printf("send %d: %#v", c.mux.chanList.offset, msg) |
| 150 » } |
| 151 |
| 152 » p := marshal(code, msg) |
| 153 » binary.BigEndian.PutUint32(p[1:], c.remoteId) |
| 154 » return c.writePacket(p) |
| 155 } |
| 156 |
| 157 func min(a uint32, b int) uint32 { |
| 158 » if a < uint32(b) { |
| 159 » » return a |
| 160 » } |
| 161 » return uint32(b) |
| 162 } |
| 163 |
| 164 func (c *channel) WriteExtended(data []byte, extendedCode uint32) (n int, err er
ror) { |
| 165 » if c.sentEOF { |
| 166 » » return 0, io.EOF |
| 167 » } |
| 168 » // 1 byte message type, 4 bytes remoteId, 4 bytes data length |
| 169 » opCode := byte(msgChannelData) |
| 170 » headerLength := uint32(9) |
| 171 » if extendedCode > 0 { |
| 172 » » headerLength += 4 |
| 173 » » opCode = msgChannelExtendedData |
| 174 » } |
| 175 |
| 176 » for len(data) > 0 { |
| 177 » » space := min(c.maxPacket-headerLength, len(data)) |
| 178 » » if space, err = c.getWindowSpace(space); err != nil { |
| 179 » » » return n, err |
| 180 » » } |
| 181 » » todo := data |
| 182 » » if uint32(len(todo)) > space { |
| 183 » » » todo = todo[:space] |
| 184 » » } |
| 185 |
| 186 » » packet := make([]byte, headerLength+uint32(len(todo))) |
| 187 » » packet[0] = opCode |
| 188 » » marshalUint32(packet[1:], c.remoteId) |
| 189 » » if extendedCode > 0 { |
| 190 » » » marshalUint32(packet[5:], uint32(extendedCode)) |
| 191 » » } |
| 192 » » marshalUint32(packet[headerLength-4:], uint32(len(todo))) |
| 193 » » copy(packet[headerLength:], todo) |
| 194 » » if err = c.writePacket(packet); err != nil { |
| 195 » » » return n, err |
| 196 » » } |
| 197 |
| 198 » » n += len(todo) |
| 199 » » data = data[len(todo):] |
| 200 » } |
| 201 |
| 202 » return n, err |
| 203 } |
| 204 |
| 205 func (c *channel) handleData(packet []byte) error { |
| 206 » sz := 9 |
| 207 » if packet[0] == msgChannelExtendedData { |
| 208 » » sz = 13 |
| 209 » } |
| 210 » if len(packet) < sz { |
| 211 » » // malformed data packet |
| 212 » » return ParseError{packet[0]} |
| 213 » } |
| 214 |
| 215 » var extended uint32 |
| 216 » if sz > 9 { |
| 217 » » extended = binary.BigEndian.Uint32(packet[5:]) |
| 218 » } |
| 219 |
| 220 » length := binary.BigEndian.Uint32(packet[sz-4 : sz]) |
| 221 » if length == 0 { |
| 222 » » return nil |
| 223 » } |
| 224 » data := packet[sz:] |
| 225 » if length != uint32(len(data)) { |
| 226 » » return errors.New("ssh: wrong packet length") |
| 227 » } |
| 228 |
| 229 » c.mu.Lock() |
| 230 » if c.myWindow < length { |
| 231 » » c.mu.Unlock() |
| 232 » » // TODO(hanwen): should send Disconnect with reason? |
| 233 » » return errors.New("ssh: remote side wrote too much") |
| 234 » } |
| 235 » c.myWindow -= length |
| 236 » c.mu.Unlock() |
| 237 |
| 238 » if extended == 1 { |
| 239 » » c.extPending.write(data) |
| 240 » } else if extended > 0 { |
| 241 » » // discard other extended data. |
| 242 » } else { |
| 243 » » c.pending.write(data) |
| 244 » } |
| 245 » return nil |
| 246 } |
| 247 |
| 248 func (c *channel) adjustWindow(n uint32) error { |
| 249 » c.mu.Lock() |
| 250 » c.myWindow += uint32(n) |
| 251 » c.mu.Unlock() |
| 252 » return c.sendMessage(msgChannelWindowAdjust, windowAdjustMsg{ |
115 AdditionalBytes: uint32(n), | 253 AdditionalBytes: uint32(n), |
116 » } | 254 » }) |
117 » return c.writePacket(marshal(msgChannelWindowAdjust, msg)) | 255 } |
118 } | 256 |
119 | 257 func (c *channel) ReadExtended(data []byte, extended uint32) (n int, err error)
{ |
120 // sendEOF sends EOF to the remote side. RFC 4254 Section 5.3 | 258 » if extended == 1 { |
121 func (c *channel) sendEOF() error { | 259 » » n, err = c.extPending.Read(data) |
122 » return c.writePacket(marshal(msgChannelEOF, channelEOFMsg{ | 260 » } else if extended == 0 { |
123 » » PeersId: c.remoteId, | 261 » » n, err = c.pending.Read(data) |
124 » })) | 262 » } else { |
125 } | 263 » » return 0, fmt.Errorf("ssh: extended code %d implemented", extend
ed) |
126 | 264 » } |
127 // sendClose informs the remote side of our intent to close the channel. | 265 |
128 func (c *channel) sendClose() error { | 266 » if n > 0 { |
129 » return c.packetConn.writePacket(marshal(msgChannelClose, channelCloseMsg
{ | 267 » » err = c.adjustWindow(uint32(n)) |
130 » » PeersId: c.remoteId, | 268 » » // sendWindowAdjust can return io.EOF if the remote |
131 » })) | 269 » » // peer has closed the connection, however we want to |
132 } | 270 » » // defer forwarding io.EOF to the caller of Read until |
133 | 271 » » // the buffer has been drained. |
134 func (c *channel) sendChannelOpenFailure(reason RejectionReason, message string)
error { | 272 » » if n > 0 && err == io.EOF { |
135 » reject := channelOpenFailureMsg{ | 273 » » » err = nil |
136 » » PeersId: c.remoteId, | 274 » » } |
137 » » Reason: reason, | 275 » } |
138 » » Message: message, | 276 |
139 » » Language: "en", | 277 » return n, err |
140 » } | 278 } |
141 » return c.writePacket(marshal(msgChannelOpenFailure, reject)) | 279 |
142 } | 280 func (c *channel) handlePacket(packet []byte) error { |
143 | 281 » if uint32(len(packet)) > c.maxPacket { |
144 func (c *channel) writePacket(b []byte) error { | 282 » » // TODO(hanwen): should send Disconnect? |
145 » if c.closed() { | 283 » » return errors.New("ssh: incoming packet exceeds maximum size") |
146 » » return io.EOF | 284 » } |
147 » } | 285 |
148 » if uint32(len(b)) > c.maxPacket { | 286 » switch packet[0] { |
149 » » return fmt.Errorf("ssh: cannot write %d bytes, maxPacket is %d b
ytes", len(b), c.maxPacket) | 287 » case msgChannelData, msgChannelExtendedData: |
150 » } | 288 » » return c.handleData(packet) |
151 » return c.packetConn.writePacket(b) | 289 » case msgChannelClose: |
152 } | 290 » » // Ack the close. |
153 | 291 » » c.sendMessage(msgChannelClose, channelCloseMsg{ |
154 func (c *channel) closed() bool { | 292 » » » PeersId: c.remoteId}) |
155 » return atomic.LoadUint32(&c.isClosed) > 0 | 293 |
156 } | 294 » » c.pending.eof() |
157 | 295 » » c.extPending.eof() |
158 func (c *channel) setClosed() bool { | 296 » » close(c.msg) |
159 » return atomic.CompareAndSwapUint32(&c.isClosed, 0, 1) | 297 » » close(c.incomingRequests) |
160 } | 298 » » c.mux.chanList.remove(c.localId) |
161 | 299 |
162 type serverChan struct { | 300 » » return nil |
163 » channel | 301 » case msgChannelEOF: |
164 » // immutable once created | 302 » » // RFC 4254 is mute on how EOF affects dataExt messages but |
165 » chanType string | 303 » » // it is logical to signal EOF at the same time. |
166 » extraData []byte | 304 » » c.extPending.eof() |
167 | 305 |
168 » serverConn *ServerConn | 306 » » // For ServerConn, ChannelRequests are actually output |
169 » myWindow uint32 | 307 » » // as Read error. This means that no requests can be |
170 » theyClosed bool // indicates the close msg has been received from the r
emote side | 308 » » // processed after EOF is sent, which is a bug |
171 » theySentEOF bool | 309 » » c.pending.eof() |
172 » isDead uint32 | 310 » » return nil |
173 » err error | 311 » } |
174 | 312 |
175 » pendingRequests []ChannelRequest | 313 » decoded, err := decode(packet) |
176 » pendingData []byte | 314 » if err != nil { |
177 » head, length int | 315 » » return err |
178 | 316 » } |
179 » // This lock is inferior to serverConn.lock | 317 |
180 » cond *sync.Cond | 318 » switch msg := decoded.(type) { |
181 } | 319 » case *windowAdjustMsg: |
182 | 320 » » if !c.remoteWin.add(msg.AdditionalBytes) { |
183 func (c *serverChan) ReceivedRequests() chan *ChannelRequest { | 321 » » » return fmt.Errorf("invalid window update %d", msg.Additi
onalBytes) |
184 » panic("not implemented") | 322 » » } |
| 323 |
| 324 » case *channelRequestMsg: |
| 325 » » req := ChannelRequest{ |
| 326 » » » Request: msg.Request, |
| 327 » » » WantReply: msg.WantReply, |
| 328 » » » Payload: msg.RequestSpecificData, |
| 329 » » } |
| 330 |
| 331 » » c.incomingRequests <- &req |
| 332 » default: |
| 333 » » c.msg <- msg |
| 334 » } |
185 return nil | 335 return nil |
186 } | 336 } |
187 | 337 |
188 func (c *serverChan) Accept() error { | 338 func newChannel(chanType string, extraData []byte) *channel { |
189 » c.serverConn.lock.Lock() | 339 » return &channel{ |
190 » defer c.serverConn.lock.Unlock() | 340 » » remoteWin: window{Cond: newCond()}, |
191 | 341 » » myWindow: defaultWindowSize, |
192 » if c.serverConn.err != nil { | 342 » » pending: newBuffer(), |
193 » » return c.serverConn.err | 343 » » extPending: newBuffer(), |
194 » } | 344 » » incomingRequests: make(chan *ChannelRequest, 16), |
195 | 345 » » msg: make(chan interface{}, 16), |
| 346 » » chanType: chanType, |
| 347 » » extraData: extraData, |
| 348 » } |
| 349 } |
| 350 |
| 351 var errUndecided = errors.New("ssh: must Accept or Reject channel") |
| 352 var errDecidedAlready = errors.New("ssh: can call Accept or Reject only once") |
| 353 |
| 354 type extChannel struct { |
| 355 » code uint32 |
| 356 » ch *channel |
| 357 } |
| 358 |
| 359 func (e *extChannel) Write(data []byte) (n int, err error) { |
| 360 » return e.ch.WriteExtended(data, e.code) |
| 361 } |
| 362 func (e *extChannel) Read(data []byte) (n int, err error) { |
| 363 » return e.ch.ReadExtended(data, e.code) |
| 364 } |
| 365 |
| 366 func (c *channel) Accept() error { |
| 367 » if c.decided { |
| 368 » » return errDecidedAlready |
| 369 » } |
196 confirm := channelOpenConfirmMsg{ | 370 confirm := channelOpenConfirmMsg{ |
197 PeersId: c.remoteId, | 371 PeersId: c.remoteId, |
198 MyId: c.localId, | 372 MyId: c.localId, |
199 MyWindow: c.myWindow, | 373 MyWindow: c.myWindow, |
200 MaxPacketSize: c.maxPacket, | 374 MaxPacketSize: c.maxPacket, |
201 } | 375 } |
202 » return c.writePacket(marshal(msgChannelOpenConfirm, confirm)) | 376 » c.decided = true |
203 } | 377 » if err := c.sendMessage(msgChannelOpenConfirm, confirm); err != nil { |
204 | 378 » » return err |
205 func (c *serverChan) Reject(reason RejectionReason, message string) error { | 379 » } |
206 » c.serverConn.lock.Lock() | 380 |
207 » defer c.serverConn.lock.Unlock() | 381 » return nil |
208 | 382 } |
209 » if c.serverConn.err != nil { | 383 |
210 » » return c.serverConn.err | 384 func (ch *channel) Reject(reason RejectionReason, message string) error { |
211 » } | 385 » if ch.decided { |
212 | 386 » » return errDecidedAlready |
213 » return c.sendChannelOpenFailure(reason, message) | 387 » } |
214 } | 388 » reject := channelOpenFailureMsg{ |
215 | 389 » » PeersId: ch.remoteId, |
216 func (c *serverChan) SendRequest(name string, wantReply bool, data []byte) (bool
, error) { | 390 » » Reason: reason, |
217 » req := &channelRequestMsg{ | 391 » » Message: message, |
218 » » PeersId: c.remoteId, | 392 » » Language: "en", |
| 393 » } |
| 394 » ch.decided = true |
| 395 » return ch.sendMessage(msgChannelOpenFailure, reject) |
| 396 } |
| 397 |
| 398 func (ch *channel) Read(data []byte) (int, error) { |
| 399 » if !ch.decided { |
| 400 » » return 0, errUndecided |
| 401 » } |
| 402 |
| 403 » return ch.ReadExtended(data, 0) |
| 404 } |
| 405 |
| 406 func (ch *channel) Write(data []byte) (int, error) { |
| 407 » if !ch.decided { |
| 408 » » return 0, errUndecided |
| 409 » } |
| 410 » return ch.WriteExtended(data, 0) |
| 411 } |
| 412 |
| 413 func (ch *channel) CloseWrite() error { |
| 414 » if !ch.decided { |
| 415 » » return errUndecided |
| 416 » } |
| 417 » ch.sentEOF = true |
| 418 » return ch.sendMessage(msgChannelEOF, channelEOFMsg{ |
| 419 » » PeersId: ch.remoteId}) |
| 420 } |
| 421 |
| 422 func (ch *channel) Close() error { |
| 423 » if !ch.decided { |
| 424 » » return errUndecided |
| 425 » } |
| 426 |
| 427 » return ch.sendMessage(msgChannelClose, channelCloseMsg{ |
| 428 » » PeersId: ch.remoteId}) |
| 429 } |
| 430 |
| 431 func (ch *channel) Extended(code uint32) io.ReadWriter { |
| 432 » if !ch.decided { |
| 433 » » return nil |
| 434 » } |
| 435 » return &extChannel{code, ch} |
| 436 } |
| 437 |
| 438 // SendRequest sends a channel request. If wantReply is set, it will |
| 439 // wait for a reply and return the result as a boolean. |
| 440 func (ch *channel) SendRequest(name string, wantReply bool, payload []byte) (boo
l, error) { |
| 441 » if !ch.decided { |
| 442 » » return false, errUndecided |
| 443 » } |
| 444 |
| 445 » if wantReply { |
| 446 » » ch.sentRequestMu.Lock() |
| 447 » » defer ch.sentRequestMu.Unlock() |
| 448 » } |
| 449 |
| 450 » msg := channelRequestMsg{ |
| 451 » » PeersId: ch.remoteId, |
219 Request: name, | 452 Request: name, |
220 WantReply: wantReply, | 453 WantReply: wantReply, |
221 » » RequestSpecificData: data, | 454 » » RequestSpecificData: payload, |
222 » } | 455 » } |
223 » if req.WantReply { | 456 |
224 » » panic("not implemented") | 457 » if err := ch.sendMessage(msgChannelRequest, msg); err != nil { |
225 » } | 458 » » return false, err |
226 » err := c.writePacket(marshal(msgChannelRequest, req)) | 459 » } |
227 » return false, err | 460 |
228 } | 461 » if wantReply { |
229 | 462 » » m, ok := (<-ch.msg) |
230 func (c *serverChan) handlePacket(packet interface{}) { | 463 » » if !ok { |
231 » c.cond.L.Lock() | 464 » » » return false, io.EOF |
232 » defer c.cond.L.Unlock() | 465 » » } |
233 | 466 » » switch m.(type) { |
234 » switch packet := packet.(type) { | 467 » » case *channelRequestFailureMsg: |
235 » case *channelRequestMsg: | 468 » » » return false, nil |
236 » » req := ChannelRequest{ | 469 » » case *channelRequestSuccessMsg: |
237 » » » Request: packet.Request, | 470 » » » return true, nil |
238 » » » WantReply: packet.WantReply, | 471 » » default: |
239 » » » Payload: packet.RequestSpecificData, | 472 » » » return false, fmt.Errorf("unexpected response %#v", m) |
240 » » } | 473 » » } |
241 | 474 » } |
242 » » c.pendingRequests = append(c.pendingRequests, req) | 475 |
243 » » c.cond.Signal() | 476 » return false, nil |
244 » case *channelCloseMsg: | 477 } |
245 » » c.theyClosed = true | 478 |
246 » » c.cond.Signal() | 479 // AckRequest either sends an ack or nack to the channel request. |
247 » case *channelEOFMsg: | 480 func (ch *channel) AckRequest(ok bool) error { |
248 » » c.theySentEOF = true | 481 » if !ch.decided { |
249 » » c.cond.Signal() | 482 » » return errUndecided |
250 » case *windowAdjustMsg: | 483 » } |
251 » » if !c.remoteWin.add(packet.AdditionalBytes) { | 484 |
252 » » » panic("illegal window update") | 485 » var msg interface{} |
253 » » } | 486 » var code byte |
254 » default: | |
255 » » panic("unknown packet type") | |
256 » } | |
257 } | |
258 | |
259 func (c *serverChan) handleData(data []byte) { | |
260 » c.cond.L.Lock() | |
261 » defer c.cond.L.Unlock() | |
262 | |
263 » // The other side should never send us more than our window. | |
264 » if len(data)+c.length > len(c.pendingData) { | |
265 » » // TODO(agl): we should tear down the channel with a protocol | |
266 » » // error. | |
267 » » return | |
268 » } | |
269 | |
270 » c.myWindow -= uint32(len(data)) | |
271 » for i := 0; i < 2; i++ { | |
272 » » tail := c.head + c.length | |
273 » » if tail >= len(c.pendingData) { | |
274 » » » tail -= len(c.pendingData) | |
275 » » } | |
276 » » n := copy(c.pendingData[tail:], data) | |
277 » » data = data[n:] | |
278 » » c.length += n | |
279 » } | |
280 | |
281 » c.cond.Signal() | |
282 } | |
283 | |
284 func (c *serverChan) Stderr() io.ReadWriter { | |
285 » return extendedDataChannel{c: c, t: extendedDataStderr} | |
286 } | |
287 | |
288 // extendedDataChannel is an io.Writer that writes any data to c as extended | |
289 // data of the given type. | |
290 type extendedDataChannel struct { | |
291 » t extendedDataTypeCode | |
292 » c *serverChan | |
293 } | |
294 | |
295 func (edc extendedDataChannel) Read(data []byte) (n int, err error) { | |
296 » panic("unimplemented") | |
297 » return 0, nil | |
298 } | |
299 | |
300 func (edc extendedDataChannel) Write(data []byte) (n int, err error) { | |
301 » const headerLength = 13 // 1 byte message type, 4 bytes remoteId, 4 byte
s extended message type, 4 bytes data length | |
302 » c := edc.c | |
303 » for len(data) > 0 { | |
304 » » space := min(c.maxPacket-headerLength, len(data)) | |
305 » » if space, err = c.getWindowSpace(space); err != nil { | |
306 » » » return 0, err | |
307 » » } | |
308 » » todo := data | |
309 » » if uint32(len(todo)) > space { | |
310 » » » todo = todo[:space] | |
311 » » } | |
312 | |
313 » » packet := make([]byte, headerLength+len(todo)) | |
314 » » packet[0] = msgChannelExtendedData | |
315 » » marshalUint32(packet[1:], c.remoteId) | |
316 » » marshalUint32(packet[5:], uint32(edc.t)) | |
317 » » marshalUint32(packet[9:], uint32(len(todo))) | |
318 » » copy(packet[13:], todo) | |
319 | |
320 » » if err = c.writePacket(packet); err != nil { | |
321 » » » return | |
322 » » } | |
323 | |
324 » » n += len(todo) | |
325 » » data = data[len(todo):] | |
326 » } | |
327 | |
328 » return | |
329 } | |
330 | |
331 func (c *serverChan) Read(data []byte) (n int, err error) { | |
332 » n, err, windowAdjustment := c.read(data) | |
333 | |
334 » if windowAdjustment > 0 { | |
335 » » packet := marshal(msgChannelWindowAdjust, windowAdjustMsg{ | |
336 » » » PeersId: c.remoteId, | |
337 » » » AdditionalBytes: windowAdjustment, | |
338 » » }) | |
339 » » err = c.writePacket(packet) | |
340 » } | |
341 | |
342 » return | |
343 } | |
344 | |
345 func (c *serverChan) read(data []byte) (n int, err error, windowAdjustment uint3
2) { | |
346 » c.cond.L.Lock() | |
347 » defer c.cond.L.Unlock() | |
348 | |
349 » if c.err != nil { | |
350 » » return 0, c.err, 0 | |
351 » } | |
352 | |
353 » for { | |
354 » » if c.theySentEOF || c.theyClosed || c.dead() { | |
355 » » » return 0, io.EOF, 0 | |
356 » » } | |
357 | |
358 » » if len(c.pendingRequests) > 0 { | |
359 » » » req := c.pendingRequests[0] | |
360 » » » if len(c.pendingRequests) == 1 { | |
361 » » » » c.pendingRequests = nil | |
362 » » » } else { | |
363 » » » » oldPendingRequests := c.pendingRequests | |
364 » » » » c.pendingRequests = make([]ChannelRequest, len(o
ldPendingRequests)-1) | |
365 » » » » copy(c.pendingRequests, oldPendingRequests[1:]) | |
366 » » » } | |
367 | |
368 » » » return 0, req, 0 | |
369 » » } | |
370 | |
371 » » if c.length > 0 { | |
372 » » » tail := min(uint32(c.head+c.length), len(c.pendingData)) | |
373 » » » n = copy(data, c.pendingData[c.head:tail]) | |
374 » » » c.head += n | |
375 » » » c.length -= n | |
376 » » » if c.head == len(c.pendingData) { | |
377 » » » » c.head = 0 | |
378 » » » } | |
379 | |
380 » » » windowAdjustment = uint32(len(c.pendingData)-c.length) -
c.myWindow | |
381 » » » if windowAdjustment < uint32(len(c.pendingData)/2) { | |
382 » » » » windowAdjustment = 0 | |
383 » » » } | |
384 » » » c.myWindow += windowAdjustment | |
385 | |
386 » » » return | |
387 » » } | |
388 | |
389 » » c.cond.Wait() | |
390 » } | |
391 | |
392 » panic("unreachable") | |
393 } | |
394 | |
395 // getWindowSpace takes, at most, max bytes of space from the peer's window. It | |
396 // returns the number of bytes actually reserved. | |
397 func (c *serverChan) getWindowSpace(max uint32) (uint32, error) { | |
398 » if c.dead() || c.closed() { | |
399 » » return 0, io.EOF | |
400 » } | |
401 » return c.remoteWin.reserve(max), nil | |
402 } | |
403 | |
404 func (c *serverChan) dead() bool { | |
405 » return atomic.LoadUint32(&c.isDead) > 0 | |
406 } | |
407 | |
408 func (c *serverChan) setDead() { | |
409 » atomic.StoreUint32(&c.isDead, 1) | |
410 } | |
411 | |
412 func (c *serverChan) Write(data []byte) (n int, err error) { | |
413 » const headerLength = 9 // 1 byte message type, 4 bytes remoteId, 4 bytes
data length | |
414 » for len(data) > 0 { | |
415 » » space := min(c.maxPacket-headerLength, len(data)) | |
416 » » if space, err = c.getWindowSpace(space); err != nil { | |
417 » » » return 0, err | |
418 » » } | |
419 » » todo := data | |
420 » » if uint32(len(todo)) > space { | |
421 » » » todo = todo[:space] | |
422 » » } | |
423 | |
424 » » packet := make([]byte, headerLength+len(todo)) | |
425 » » packet[0] = msgChannelData | |
426 » » marshalUint32(packet[1:], c.remoteId) | |
427 » » marshalUint32(packet[5:], uint32(len(todo))) | |
428 » » copy(packet[9:], todo) | |
429 | |
430 » » if err = c.writePacket(packet); err != nil { | |
431 » » » return | |
432 » » } | |
433 | |
434 » » n += len(todo) | |
435 » » data = data[len(todo):] | |
436 » } | |
437 | |
438 » return | |
439 } | |
440 | |
441 // Close signals the intent to close the channel. | |
442 func (c *serverChan) Close() error { | |
443 » c.serverConn.lock.Lock() | |
444 » defer c.serverConn.lock.Unlock() | |
445 | |
446 » if c.serverConn.err != nil { | |
447 » » return c.serverConn.err | |
448 » } | |
449 | |
450 » if !c.setClosed() { | |
451 » » return errors.New("ssh: channel already closed") | |
452 » } | |
453 » return c.sendClose() | |
454 } | |
455 | |
456 func (c *serverChan) AckRequest(ok bool) error { | |
457 » c.serverConn.lock.Lock() | |
458 » defer c.serverConn.lock.Unlock() | |
459 | |
460 » if c.serverConn.err != nil { | |
461 » » return c.serverConn.err | |
462 » } | |
463 | |
464 if !ok { | 487 if !ok { |
465 » » ack := channelRequestFailureMsg{ | 488 » » code = msgChannelFailure |
466 » » » PeersId: c.remoteId, | 489 » » msg = channelRequestFailureMsg{ |
467 » » } | 490 » » » PeersId: ch.remoteId, |
468 » » return c.writePacket(marshal(msgChannelFailure, ack)) | 491 » » } |
469 » } | 492 » } else { |
470 | 493 » » code = msgChannelSuccess |
471 » ack := channelRequestSuccessMsg{ | 494 » » msg = channelRequestSuccessMsg{ |
472 » » PeersId: c.remoteId, | 495 » » » PeersId: ch.remoteId, |
473 » } | 496 » » } |
474 » return c.writePacket(marshal(msgChannelSuccess, ack)) | 497 » } |
475 } | 498 » return ch.sendMessage(code, msg) |
476 | 499 } |
477 func (c *serverChan) ChannelType() string { | 500 |
478 » return c.chanType | 501 func (ch *channel) ChannelType() string { |
479 } | 502 » return ch.chanType |
480 | 503 } |
481 func (c *serverChan) ExtraData() []byte { | 504 |
482 » return c.extraData | 505 func (ch *channel) ExtraData() []byte { |
483 } | 506 » return ch.extraData |
484 | 507 } |
485 // A clientChan represents a single RFC 4254 channel multiplexed | 508 |
486 // over a SSH connection. | 509 // compatChannel is a hack to implement Channel's |
487 type clientChan struct { | 510 // handing of channel requests. |
488 » channel | 511 type compatChannel struct { |
489 » stdin *chanWriter | 512 » *channel |
490 » stdout *chanReader | 513 } |
491 » stderr *chanReader | 514 |
492 » msg chan interface{} | 515 func newCompatChannel(ch *channel) *compatChannel { |
493 } | 516 » c := &compatChannel{ch} |
494 | 517 » go c.loop() |
495 // newClientChan returns a partially constructed *clientChan | |
496 // using the local id provided. To be usable clientChan.remoteId | |
497 // needs to be assigned once known. | |
498 func newClientChan(cc packetConn, id uint32) *clientChan { | |
499 » c := &clientChan{ | |
500 » » channel: channel{ | |
501 » » » packetConn: cc, | |
502 » » » localId: id, | |
503 » » » remoteWin: window{Cond: newCond()}, | |
504 » » }, | |
505 » » msg: make(chan interface{}, 16), | |
506 » } | |
507 » c.stdin = &chanWriter{ | |
508 » » channel: &c.channel, | |
509 » } | |
510 » c.stdout = &chanReader{ | |
511 » » channel: &c.channel, | |
512 » » buffer: newBuffer(), | |
513 » } | |
514 » c.stderr = &chanReader{ | |
515 » » channel: &c.channel, | |
516 » » buffer: newBuffer(), | |
517 » } | |
518 return c | 518 return c |
519 } | 519 } |
520 | 520 |
521 // waitForChannelOpenResponse, if successful, fills out | 521 func (c *compatChannel) loop() { |
522 // the remoteId and records any initial window advertisement. | 522 » for r := range c.channel.incomingRequests { |
523 func (c *clientChan) waitForChannelOpenResponse() error { | 523 » » c.channel.pending.addRequest(r) |
524 » switch msg := (<-c.msg).(type) { | 524 » } |
525 » case *channelOpenConfirmMsg: | 525 } |
526 » » if msg.MaxPacketSize < minPacketLength || msg.MaxPacketSize > 1<
<31 { | 526 |
527 » » » return errors.New("ssh: invalid MaxPacketSize from peer"
) | 527 func (c *compatChannel) Stderr() io.Writer { |
528 » » } | 528 » return c.Extended(1) |
529 » » // fixup remoteId field | 529 } |
530 » » c.remoteId = msg.MyId | |
531 » » c.maxPacket = msg.MaxPacketSize | |
532 » » c.remoteWin.add(msg.MyWindow) | |
533 » » return nil | |
534 » case *channelOpenFailureMsg: | |
535 » » return errors.New(safeString(msg.Message)) | |
536 » } | |
537 » return errors.New("ssh: unexpected packet") | |
538 } | |
539 | |
540 // Close signals the intent to close the channel. | |
541 func (c *clientChan) Close() error { | |
542 » if !c.setClosed() { | |
543 » » return errors.New("ssh: channel already closed") | |
544 » } | |
545 » c.stdout.eof() | |
546 » c.stderr.eof() | |
547 » return c.sendClose() | |
548 } | |
549 | |
550 // A chanWriter represents the stdin of a remote process. | |
551 type chanWriter struct { | |
552 » *channel | |
553 » // indicates the writer has been closed. eof is owned by the | |
554 » // caller of Write/Close. | |
555 » eof bool | |
556 } | |
557 | |
558 // Write writes data to the remote process's standard input. | |
559 func (w *chanWriter) Write(data []byte) (written int, err error) { | |
560 » const headerLength = 9 // 1 byte message type, 4 bytes remoteId, 4 bytes
data length | |
561 » for len(data) > 0 { | |
562 » » if w.eof || w.closed() { | |
563 » » » err = io.EOF | |
564 » » » return | |
565 » » } | |
566 » » // never send more data than maxPacket even if | |
567 » » // there is sufficent window. | |
568 » » n := min(w.maxPacket-headerLength, len(data)) | |
569 » » r := w.remoteWin.reserve(n) | |
570 » » n = r | |
571 » » remoteId := w.remoteId | |
572 » » packet := []byte{ | |
573 » » » msgChannelData, | |
574 » » » byte(remoteId >> 24), byte(remoteId >> 16), byte(remoteI
d >> 8), byte(remoteId), | |
575 » » » byte(n >> 24), byte(n >> 16), byte(n >> 8), byte(n), | |
576 » » } | |
577 » » if err = w.writePacket(append(packet, data[:n]...)); err != nil
{ | |
578 » » » break | |
579 » » } | |
580 » » data = data[n:] | |
581 » » written += int(n) | |
582 » } | |
583 » return | |
584 } | |
585 | |
586 func min(a uint32, b int) uint32 { | |
587 » if a < uint32(b) { | |
588 » » return a | |
589 » } | |
590 » return uint32(b) | |
591 } | |
592 | |
593 func (w *chanWriter) Close() error { | |
594 » w.eof = true | |
595 » return w.sendEOF() | |
596 } | |
597 | |
598 // A chanReader represents stdout or stderr of a remote process. | |
599 type chanReader struct { | |
600 » *channel // the channel backing this reader | |
601 » *buffer | |
602 } | |
603 | |
604 // Read reads data from the remote process's stdout or stderr. | |
605 func (r *chanReader) Read(buf []byte) (int, error) { | |
606 » n, err := r.buffer.Read(buf) | |
607 » if err != nil { | |
608 » » if err == io.EOF { | |
609 » » » return n, err | |
610 » » } | |
611 » » return 0, err | |
612 » } | |
613 » err = r.sendWindowAdj(n) | |
614 » if err == io.EOF && n > 0 { | |
615 » » // sendWindowAdjust can return io.EOF if the remote peer has | |
616 » » // closed the connection, however we want to defer forwarding io
.EOF to the | |
617 » » // caller of Read until the buffer has been drained. | |
618 » » err = nil | |
619 » } | |
620 » return n, err | |
621 } | |
LEFT | RIGHT |