Legend:
- Unmodified
- Added
- Removed
-
ucomet.py
r44 r45 1 1 from twisted.web2 import resource, http, http_headers, stream 2 from randomimport random2 import random 3 3 import string 4 4 import cjson … … 14 14 usedIds.add(None) 15 15 @staticmethod 16 def getId(chars=32):16 def makeId(chars=32): 17 17 id = None 18 18 while id in Utils.usedIds: 19 id = ''.join(random.sample(string.letters+string.digits, 12))19 id = ''.join(random.sample(string.letters+string.digits, chars)) 20 20 Utils.usedIds.add(id) 21 21 return id … … 40 40 jsonp = "jsonp" 41 41 newstream = stream.ProducerStream() 42 self.streams.append(newstream) 43 if len(self.streams) > len(self.message_queue): 44 amsg = [] 45 else: 42 self.streams.append((newstream,jsonp)) 43 amsg = [] 44 if len(self.streams) <= len(self.message_queue): 46 45 amsg = self.message_queue.pop() 47 46 if addMessage: amsg.insert(0,addMessage) 48 47 self.message_queue.append(amsg) 49 return newstream 48 print "Created Stream!!" 49 return makeResponse(newstream) 50 50 51 51 52 52 def connect(self, message, args): 53 print "message connect %s" % message 53 54 msg = { 54 55 "channel": "/meta/connect", … … 58 59 "advice": {"reconnect": "retry"} 59 60 } 60 return self.addStream(message, args, msg) 61 str = self.addStream(message, args, msg) 62 if len(self.message_queue[0]) > 1: 63 self.deliver() 64 return str 61 65 62 66 def flushStreams(self): … … 71 75 self.message_queue[0].append(message) 72 76 73 if len(self.streams) : #and len(self.message_queue[0]):77 if len(self.streams) and len(self.message_queue[0]): 74 78 stream,jsoncallback = self.streams.pop(0) 79 print "delivering %s" % ( "%s(%s)" % (jsoncallback, 80 encodejson(self.message_queue[0]))) 75 81 stream.write( "%s(%s)" % (jsoncallback, 76 encodejson(self.message_queue [0])))82 encodejson(self.message_queue.pop(0)))) 77 83 78 84 stream.finish() 79 85 del stream 86 else: 87 "might be delivering eventually %s" % message 80 88 81 89 class ClientSubscription(object): … … 106 114 ClientSubscriptionType = ClientSubscription): 107 115 self.ClientSubscriptionType = ClientSubscriptionType 108 self.connection = None116 self.connection = Connection() 109 117 self.subscriptions = {} 110 118 if id: … … 114 122 115 123 def connect(self,msg,args): 116 if not self.connection:117 self.connection = Connection()118 124 return self.connection.connect(msg, args) 119 125 … … 134 140 return self._subunsubscribe(msg, args, nmsg) 135 141 else: 136 return makeResponse( 137 encodejson([{"error":"already subscribed"}]), 138 code = HTTP_ERROR) 142 return {"error":"already subscribed"} 139 143 140 144 def unsubscribe(self,msg,args): … … 150 154 return self._subunsubscribe(msg, args, nmsg) 151 155 else: 152 return makeResponse( 153 encodejson([{"error":"not even subscribed"}]), 154 code = HTTP_ERROR) 156 return {"error":"not even subscribed"} 155 157 156 158 def dispatch(self,msg,args): 157 159 channel = msg["channel"] 158 160 data = msg["data"] 159 if channel in self.suscriptions: 160 self.subscriptions[channel].messageReceived(data) 161 else: 162 return makeResponse( 163 encodejson([{"error":"not subscribed to %s" % channel}]), 164 code = HTTP_ERROR) 165 161 if channel in self.subscriptions: 162 #conn = self.connection.addStream(msg, args) 163 self.subscriptions[channel].messageReceived(data) 164 return {"successful":True,"channel":channel} 165 else: 166 return {"error":"not subscribed to %s" % channel} 166 167 167 168 def _subunsubscribe(self,msg,args,nmsg): 168 169 self.connection.flushStreams() 169 stream = self.connection.addStream(msg, args,nmsg)170 self.connection.deliver()171 return stream170 #stream = self.connection.addStream(msg, args,nmsg) 171 #self.connection.deliver() 172 return nmsg 172 173 173 174 174 175 def sendMessage(self, message, channel): 175 176 if self.connection is not None: 176 self.connection.deliver({"data":message, "channel":channel })177 self.connection.deliver({"data":message, "channel":channel, "clientId":self.id}) 177 178 else: 178 179 #FIXME make own exception … … 186 187 newHeaders.addRawHeader("Content-type", type) 187 188 for (k,v) in headers.items(): newHeaders.addRawHeader(k, v) 189 print "RESPONSE STREAM: %s" % stream 188 190 return http.Response(code=code, headers=newHeaders, stream=stream) 189 191 … … 213 215 if type(messages) is not list: 214 216 return makeResponse("Messages must be encapsulated in array", 215 type="text/plain", code=HTTP_ERROR) 217 type="text/plain", code=HTTP_ERROR) 218 print "messages %s" % messages 219 stream = None 220 responses = [] 216 221 connection = None 217 responses = []218 222 for m in messages: 219 223 … … 224 228 clientId = m["clientId"] 225 229 if clientId in self.clients: 226 client = self.client [client]230 client = self.clients[clientId] 227 231 connection = client.connection #it still may be None 228 232 229 if type(m) is dict:233 if type(m) is not dict: 230 234 responses.append({"error":"each message must be in dict format"}) 235 print("NOT DICT") 231 236 continue 232 237 233 238 if "channel" not in m: 234 239 responses.append({"error":"channel is required in message"}) 240 print("NO CHANNEL") 235 241 continue 236 242 … … 238 244 239 245 if channel == "/meta/handshake": 240 return self.handshake(args,m) 246 resp = self.handshake(m,args) 247 return resp 248 249 250 251 241 252 if channel == "/meta/connect" or \ 242 253 channel == "/meta/reconnect": 243 connection = client.connect(args,m) 244 if channel == "/meta/subscribe": 245 connection = client.subscribe(args,m) 246 if channel == "/meta/unsubscribe": 247 connection = client.unsubscribe(args,m) 248 else: connection = client.dispatch(args,m) 249 250 return connection 251 252 253 254 def handshake(self, args, message): 254 print "M %s" %m 255 stream = client.connect(m,args) 256 elif channel == "/meta/subscribe": 257 responses.append(client.subscribe(m,args)) 258 elif channel == "/meta/unsubscribe": 259 responses.append(client.unsubscribe(m,args)) 260 else: 261 responses.append(client.dispatch(m,args)) 262 263 if not stream or not client: 264 return makeResponse(encodejson(responses)) 265 else: 266 for r in responses: 267 connection.deliver(r) 268 269 return stream 270 271 272 273 def handshake(self, message, args): 255 274 #FIXME add error handlign if we're already hand shaken 256 275 257 276 client = self.ClientType(ClientSubscriptionType=self.ClientSubscriptionType) 258 277 self.clients[client.id] = client 259 return {278 return makeResponse(encodejson([{ 260 279 "channel" :"/meta/handshake", 261 280 "version" :self.version, … … 266 285 "error" :None, 267 286 "id" :message["id"] 268 } 269 270 271 272 273 274 275 287 }])) 288 289 290 291 292 293 294
