Changeset 42 for cometd.py

Show
Ignore:
Timestamp:
02/07/08 15:46:20 (4 years ago)
Author:
mike
Message:

updated stuff

Files:
1 modified

Legend:

Unmodified
Added
Removed
  • cometd.py

    r41 r42  
    301301                                print "I'm delivering foo &&&& %s" % simplejson.dumps 
    302302 
     303                                print "$$$$$ delivering %s" % self.backlog 
    303304                                self.stream.write( 
    304305                                        self.ctypeProps["envelope"] % ( 
     
    322323                        self.stream = stream.ProducerStream() 
    323324                        self.deliver() 
     325 
     326class ClientSubscription(object): 
     327         
     328 
     329        def __init__( self, channel ): 
     330                self.channel = channel 
     331                self.sendMessageCallback = None #Make sure we set this!!! 
     332 
     333 
     334        def messageReceived(self, message): 
     335                self.sendMessage(message) 
     336         
     337        #Make sure you call this first if you override this message!!! 
     338        def sendMessage(self, message): 
     339                self.sendMessageCallback(message, self.channel) 
     340         
     341        #Make sure you call this first if you override this message!!! 
     342        def subscribe(self, channel): 
     343                return True 
     344         
     345        #when we get disconnected or unsubscribed 
     346        def unsubscribe(self): 
     347                pass 
     348 
     349class SubscriptionException(Exception): 
     350        pass 
    324351 
    325352class Client: 
     
    332359                self.authToken = authToken 
    333360                self.lastError = lastError 
     361                self.cSubscriptions = {} 
     362 
     363        #override this if you want different subscriptions to have different classes 
     364        def generateSubscription(self, channel): 
     365                return  ClientSubscription(channel) 
     366 
     367        def subscribe(self, channel): 
     368                if channel in self.cSubscriptions: 
     369                        raise SubscriptionException("Already subscribed") 
     370                else: 
     371                        cs = self.generateSubscription(channel) 
     372                        cs.sendMessageCallback = self.sendMessage 
     373                        print "TRYING TO SUBSCRIBE TO " + channel 
     374                        if cs.subscribe(channel): 
     375                                self.cSubscriptions[channel] = cs 
     376                                return True 
     377                        else: 
     378                                del(cs) 
     379                                return False 
     380         
     381        #when we get disconnected or unsubscribed 
     382        def unSubscribe(self, channel): 
     383                if channel in self.cSubscriptions: 
     384                        cs = self.clientSubscription(channel, self.sendMessage) 
     385                        cs.unsubscribe(channel) 
     386                        del(self.cSubscriptions[channel]) 
     387                else: 
     388                        raise SubscriptionException("Not subscribed yet and trying to unsubscribe") 
    334389 
    335390        def setConnection(self, conn): 
     391                print "Setting Connection &&&&&&&&&&&&&& %s" % conn 
    336392                self.connection = conn 
    337393 
    338394        def messageReceived(self, message, channel): 
    339                 self.sendMessage(message,channel) 
    340                 pass 
     395                if channel in self.cSubscriptions: 
     396                        self.cSubscriptions[channel].messageReceived(message) 
     397                else: 
     398                        raise SubscriptionException("Not subscribed yet and trying to send message") 
    341399 
    342400        def sendMessage(self, message, channel): 
     
    432490 
    433491                ctr = 0 
    434                 if verbose: log.msg("messages:", type(messages), ":",  simplejson.dumps(messages)) 
     492                if verbose: log.msg("messages:", type(messages), ":",  simplejson.dumps(messages), " :", messages) 
    435493                aStream = None 
    436494                print( messages) 
     
    442500                        client = None 
    443501                        localResp = [] 
    444                         if "clientId" in m and m["clientId"] in self.clients: 
     502                        if "clientId" in m and m["clientId"] in self.clients\ 
     503                                        and self.clients[m["clientId"]].connection is not None: 
    445504                                client = self.clients[m["clientId"]] 
    446505                        else: 
     
    465524 
    466525                        chan = m["channel"] 
     526                        print "CHAN: !!!!!!!!!!!!!!!!! %s " % chan 
    467527                        if chan == "/meta/handshake" and ctr == 0: 
    468528                                # looks like we'll need to create a Connection 
     
    620680                # get the client and the channel here 
    621681                # self._subscribe() 
     682                print "self.subscribe ^^^^^^^^^^^^^^^^" 
    622683                if not self._checkClient(request, message): 
    623684                        # FIXME: we should probably send advice here instead of just raw failure 
     
    646707                # FIXME: should we be calling client.deliver and having *that* dispatch 
    647708                # down to the correct connection object? 
     709                print "Sending Message: with %s %s"  % (self.clients, client) 
    648710                client.connection.deliver(resp) 
    649711                # return { "successful": True } 
     
    679741        def _globbing_unsubscribe(self, client, chan): 
    680742                "remove a subscription" 
     743                client.unsubscribe(chan) 
    681744 
    682745                cparts = chan.split("/")[1:] 
     
    700763                if verbose: log.msg(cparts) 
    701764                root = self.subscriptions 
    702                 for part in cparts: # create parts of the topic tree that don't yet exist 
    703                         if not part in root: 
    704                                 if verbose: 
    705                                         log.msg("creating part: ", part) 
    706                                 root[part] = { "__cometd_subscribers": None } 
    707                                 # root[part] = weakref.WeakValueDictionary() 
    708                                 root[part]["__cometd_subscribers"] = weakref.WeakValueDictionary() 
    709  
    710                         root = root[part]  
    711  
    712                 root["__cometd_subscribers"][client.id] = client 
     765                # make sure the subscription was successful 
     766                if client.subscribe( chan ): 
     767                        for part in cparts: # create parts of the topic tree that don't yet exist 
     768                                if not part in root: 
     769                                        if verbose: 
     770                                                log.msg("creating part: ", part) 
     771                                        root[part] = { "__cometd_subscribers": None } 
     772                                        # root[part] = weakref.WeakValueDictionary() 
     773                                        root[part]["__cometd_subscribers"] = weakref.WeakValueDictionary() 
     774 
     775                                root = root[part]  
     776 
     777                        root["__cometd_subscribers"][client.id] = client 
    713778 
    714779        def _globbing_route(self, request, message): 
     
    758823                                subs[client].connection.deliver(message) 
    759824                return { "successful": True } 
    760  
    761825# vim:ts=4:noet: