Legend:
- Unmodified
- Added
- Removed
-
cometd.py
r41 r42 301 301 print "I'm delivering foo &&&& %s" % simplejson.dumps 302 302 303 print "$$$$$ delivering %s" % self.backlog 303 304 self.stream.write( 304 305 self.ctypeProps["envelope"] % ( … … 322 323 self.stream = stream.ProducerStream() 323 324 self.deliver() 325 326 class ClientSubscription(object): 327 328 329 def __init__( self, channel ): 330 self.channel = channel 331 self.sendMessageCallback = None #Make sure we set this!!! 332 333 334 def messageReceived(self, message): 335 self.sendMessage(message) 336 337 #Make sure you call this first if you override this message!!! 338 def sendMessage(self, message): 339 self.sendMessageCallback(message, self.channel) 340 341 #Make sure you call this first if you override this message!!! 342 def subscribe(self, channel): 343 return True 344 345 #when we get disconnected or unsubscribed 346 def unsubscribe(self): 347 pass 348 349 class SubscriptionException(Exception): 350 pass 324 351 325 352 class Client: … … 332 359 self.authToken = authToken 333 360 self.lastError = lastError 361 self.cSubscriptions = {} 362 363 #override this if you want different subscriptions to have different classes 364 def generateSubscription(self, channel): 365 return ClientSubscription(channel) 366 367 def subscribe(self, channel): 368 if channel in self.cSubscriptions: 369 raise SubscriptionException("Already subscribed") 370 else: 371 cs = self.generateSubscription(channel) 372 cs.sendMessageCallback = self.sendMessage 373 print "TRYING TO SUBSCRIBE TO " + channel 374 if cs.subscribe(channel): 375 self.cSubscriptions[channel] = cs 376 return True 377 else: 378 del(cs) 379 return False 380 381 #when we get disconnected or unsubscribed 382 def unSubscribe(self, channel): 383 if channel in self.cSubscriptions: 384 cs = self.clientSubscription(channel, self.sendMessage) 385 cs.unsubscribe(channel) 386 del(self.cSubscriptions[channel]) 387 else: 388 raise SubscriptionException("Not subscribed yet and trying to unsubscribe") 334 389 335 390 def setConnection(self, conn): 391 print "Setting Connection &&&&&&&&&&&&&& %s" % conn 336 392 self.connection = conn 337 393 338 394 def messageReceived(self, message, channel): 339 self.sendMessage(message,channel) 340 pass 395 if channel in self.cSubscriptions: 396 self.cSubscriptions[channel].messageReceived(message) 397 else: 398 raise SubscriptionException("Not subscribed yet and trying to send message") 341 399 342 400 def sendMessage(self, message, channel): … … 432 490 433 491 ctr = 0 434 if verbose: log.msg("messages:", type(messages), ":", simplejson.dumps(messages) )492 if verbose: log.msg("messages:", type(messages), ":", simplejson.dumps(messages), " :", messages) 435 493 aStream = None 436 494 print( messages) … … 442 500 client = None 443 501 localResp = [] 444 if "clientId" in m and m["clientId"] in self.clients: 502 if "clientId" in m and m["clientId"] in self.clients\ 503 and self.clients[m["clientId"]].connection is not None: 445 504 client = self.clients[m["clientId"]] 446 505 else: … … 465 524 466 525 chan = m["channel"] 526 print "CHAN: !!!!!!!!!!!!!!!!! %s " % chan 467 527 if chan == "/meta/handshake" and ctr == 0: 468 528 # looks like we'll need to create a Connection … … 620 680 # get the client and the channel here 621 681 # self._subscribe() 682 print "self.subscribe ^^^^^^^^^^^^^^^^" 622 683 if not self._checkClient(request, message): 623 684 # FIXME: we should probably send advice here instead of just raw failure … … 646 707 # FIXME: should we be calling client.deliver and having *that* dispatch 647 708 # down to the correct connection object? 709 print "Sending Message: with %s %s" % (self.clients, client) 648 710 client.connection.deliver(resp) 649 711 # return { "successful": True } … … 679 741 def _globbing_unsubscribe(self, client, chan): 680 742 "remove a subscription" 743 client.unsubscribe(chan) 681 744 682 745 cparts = chan.split("/")[1:] … … 700 763 if verbose: log.msg(cparts) 701 764 root = self.subscriptions 702 for part in cparts: # create parts of the topic tree that don't yet exist 703 if not part in root: 704 if verbose: 705 log.msg("creating part: ", part) 706 root[part] = { "__cometd_subscribers": None } 707 # root[part] = weakref.WeakValueDictionary() 708 root[part]["__cometd_subscribers"] = weakref.WeakValueDictionary() 709 710 root = root[part] 711 712 root["__cometd_subscribers"][client.id] = client 765 # make sure the subscription was successful 766 if client.subscribe( chan ): 767 for part in cparts: # create parts of the topic tree that don't yet exist 768 if not part in root: 769 if verbose: 770 log.msg("creating part: ", part) 771 root[part] = { "__cometd_subscribers": None } 772 # root[part] = weakref.WeakValueDictionary() 773 root[part]["__cometd_subscribers"] = weakref.WeakValueDictionary() 774 775 root = root[part] 776 777 root["__cometd_subscribers"][client.id] = client 713 778 714 779 def _globbing_route(self, request, message): … … 758 823 subs[client].connection.deliver(message) 759 824 return { "successful": True } 760 761 825 # vim:ts=4:noet:
