Changeset 45 for ucomet.py

Show
Ignore:
Timestamp:
02/08/08 19:24:35 (4 years ago)
Author:
mike
Message:

Got ucommet working

Files:
1 modified

Legend:

Unmodified
Added
Removed
  • ucomet.py

    r44 r45  
    11from twisted.web2 import resource, http, http_headers, stream 
    2 from random import random 
     2import random 
    33import string 
    44import cjson 
     
    1414    usedIds.add(None) 
    1515    @staticmethod 
    16     def getId(chars=32): 
     16    def makeId(chars=32): 
    1717        id = None 
    1818        while id in Utils.usedIds: 
    19             id = ''.join(random.sample(string.letters+string.digits, 12))  
     19            id = ''.join(random.sample(string.letters+string.digits, chars))  
    2020        Utils.usedIds.add(id) 
    2121        return id 
     
    4040                jsonp = "jsonp" 
    4141        newstream = stream.ProducerStream() 
    42         self.streams.append(newstream) 
    43         if len(self.streams) > len(self.message_queue): 
    44             amsg = [] 
    45         else: 
     42        self.streams.append((newstream,jsonp)) 
     43        amsg = [] 
     44        if len(self.streams) <= len(self.message_queue): 
    4645            amsg = self.message_queue.pop()  
    4746        if addMessage: amsg.insert(0,addMessage) 
    4847        self.message_queue.append(amsg) 
    49         return newstream 
     48        print "Created Stream!!" 
     49        return makeResponse(newstream) 
    5050         
    5151     
    5252    def connect(self, message, args): 
     53        print "message connect  %s" % message 
    5354        msg = { 
    5455            "channel":   "/meta/connect", 
     
    5859            "advice":    {"reconnect": "retry"} 
    5960        } 
    60         return self.addStream(message, args, msg) 
     61        str = self.addStream(message, args, msg) 
     62        if len(self.message_queue[0]) > 1: 
     63            self.deliver() 
     64        return str 
    6165     
    6266    def flushStreams(self): 
     
    7175                self.message_queue[0].append(message) 
    7276         
    73         if len(self.streams): #and len(self.message_queue[0]): 
     77        if len(self.streams) and len(self.message_queue[0]): 
    7478            stream,jsoncallback = self.streams.pop(0) 
     79            print "delivering %s" % ( "%s(%s)" % (jsoncallback, 
     80                                      encodejson(self.message_queue[0]))) 
    7581            stream.write( "%s(%s)" % (jsoncallback, 
    76                                       encodejson(self.message_queue[0]))) 
     82                                      encodejson(self.message_queue.pop(0)))) 
    7783             
    7884            stream.finish() 
    7985            del stream 
     86        else: 
     87            "might be delivering eventually %s" % message 
    8088 
    8189class ClientSubscription(object): 
     
    106114                 ClientSubscriptionType = ClientSubscription): 
    107115        self.ClientSubscriptionType = ClientSubscriptionType 
    108         self.connection = None 
     116        self.connection = Connection() 
    109117        self.subscriptions = {} 
    110118        if id: 
     
    114122             
    115123    def connect(self,msg,args): 
    116         if not self.connection: 
    117             self.connection = Connection() 
    118124        return self.connection.connect(msg, args) 
    119125         
     
    134140            return self._subunsubscribe(msg, args, nmsg) 
    135141        else: 
    136             return makeResponse( 
    137                 encodejson([{"error":"already subscribed"}]), 
    138                            code = HTTP_ERROR) 
     142            return {"error":"already subscribed"} 
    139143         
    140144    def unsubscribe(self,msg,args): 
     
    150154            return self._subunsubscribe(msg, args, nmsg) 
    151155        else: 
    152             return makeResponse( 
    153                 encodejson([{"error":"not even subscribed"}]), 
    154                            code = HTTP_ERROR) 
     156            return {"error":"not even subscribed"} 
    155157     
    156158    def dispatch(self,msg,args): 
    157159        channel = msg["channel"] 
    158160        data = msg["data"] 
    159         if channel in self.suscriptions: 
    160             self.subscriptions[channel].messageReceived(data)             
    161         else: 
    162             return makeResponse( 
    163                 encodejson([{"error":"not subscribed to %s" % channel}]), 
    164                            code = HTTP_ERROR) 
    165          
     161        if channel in self.subscriptions: 
     162            #conn = self.connection.addStream(msg, args) 
     163            self.subscriptions[channel].messageReceived(data)   
     164            return {"successful":True,"channel":channel} 
     165        else: 
     166            return {"error":"not subscribed to %s" % channel}         
    166167         
    167168    def _subunsubscribe(self,msg,args,nmsg): 
    168169        self.connection.flushStreams() 
    169         stream = self.connection.addStream(msg, args,nmsg) 
    170         self.connection.deliver() 
    171         return stream 
     170        #stream = self.connection.addStream(msg, args,nmsg) 
     171        #self.connection.deliver() 
     172        return nmsg 
    172173     
    173174     
    174175    def sendMessage(self, message, channel): 
    175176        if self.connection is not None: 
    176             self.connection.deliver({"data":message, "channel":channel}) 
     177            self.connection.deliver({"data":message, "channel":channel, "clientId":self.id}) 
    177178        else: 
    178179            #FIXME make own exception 
     
    186187    newHeaders.addRawHeader("Content-type", type) 
    187188    for (k,v) in headers.items(): newHeaders.addRawHeader(k, v) 
     189    print "RESPONSE STREAM: %s" % stream 
    188190    return http.Response(code=code, headers=newHeaders, stream=stream) 
    189191     
     
    213215        if type(messages) is not list: 
    214216            return makeResponse("Messages must be encapsulated in array", 
    215                               type="text/plain", code=HTTP_ERROR)   
     217                              type="text/plain", code=HTTP_ERROR) 
     218        print "messages %s" % messages 
     219        stream = None 
     220        responses = [] 
    216221        connection = None 
    217         responses = [] 
    218222        for m in messages: 
    219223             
     
    224228                clientId = m["clientId"] 
    225229                if clientId in self.clients: 
    226                     client = self.client[client] 
     230                    client = self.clients[clientId] 
    227231                    connection = client.connection #it still may be None 
    228232             
    229             if type(m) is dict: 
     233            if type(m) is not dict: 
    230234                responses.append({"error":"each message must be in dict format"}) 
     235                print("NOT DICT") 
    231236                continue 
    232237             
    233238            if "channel" not in m: 
    234239                responses.append({"error":"channel is required in message"}) 
     240                print("NO CHANNEL") 
    235241                continue 
    236242             
     
    238244             
    239245            if channel == "/meta/handshake": 
    240                 return self.handshake(args,m) 
     246                resp = self.handshake(m,args) 
     247                return resp 
     248             
     249             
     250             
     251             
    241252            if channel == "/meta/connect" or \ 
    242253               channel == "/meta/reconnect": 
    243                 connection = client.connect(args,m) 
    244             if channel == "/meta/subscribe": 
    245                 connection = client.subscribe(args,m) 
    246             if channel == "/meta/unsubscribe": 
    247                 connection = client.unsubscribe(args,m) 
    248             else: connection = client.dispatch(args,m) 
    249          
    250         return connection 
    251              
    252              
    253          
    254     def handshake(self, args, message): 
     254                print "M %s" %m 
     255                stream = client.connect(m,args) 
     256            elif channel == "/meta/subscribe": 
     257                responses.append(client.subscribe(m,args)) 
     258            elif channel == "/meta/unsubscribe": 
     259                responses.append(client.unsubscribe(m,args)) 
     260            else:  
     261                responses.append(client.dispatch(m,args)) 
     262         
     263        if not stream or not client: 
     264            return makeResponse(encodejson(responses)) 
     265        else: 
     266            for r in responses: 
     267                connection.deliver(r) 
     268         
     269        return stream 
     270             
     271             
     272         
     273    def handshake(self, message, args): 
    255274        #FIXME add error handlign if we're already hand shaken 
    256275         
    257276        client = self.ClientType(ClientSubscriptionType=self.ClientSubscriptionType) 
    258277        self.clients[client.id] = client 
    259         return { 
     278        return makeResponse(encodejson([{ 
    260279            "channel"                 :"/meta/handshake", 
    261280            "version"                 :self.version, 
     
    266285            "error"                   :None, 
    267286            "id"                      :message["id"] 
    268         } 
    269      
    270              
    271              
    272              
    273              
    274      
    275      
     287        }])) 
     288     
     289             
     290             
     291             
     292             
     293     
     294