root / ucomet.py

Revision 45, 9.8 kB (checked in by mike, 9 months ago)

Got ucommet working

Line 
1from twisted.web2 import resource, http, http_headers, stream
2import random
3import string
4import cjson
5import time
6
7encodejson = cjson.encode
8decodejson = cjson.decode
9
10HTTP_ERROR = 406
11
12class Utils:
13    usedIds = set() #although highly unlikely, no collisions
14    usedIds.add(None)
15    @staticmethod
16    def makeId(chars=32):
17        id = None
18        while id in Utils.usedIds:
19            id = ''.join(random.sample(string.letters+string.digits, chars)) 
20        Utils.usedIds.add(id)
21        return id
22    @staticmethod
23    def getTime():
24        return time.strftime("%Y-%m-%dT%H:%M:%S.00", time.gmtime())
25   
26class Connection(object):
27    supported_connection_types = ["callback-polling", "long-polling"]
28   
29    def __init__(self):
30        self.n_connections = 0
31        self.streams = [] #make sure these are with (stream, jsoncallbac)
32        self.message_queue = [] #array of arrays
33   
34    def addStream(self, message, args, addMessage = None):
35        jsonp = ""
36        if jsonp in args:
37            if args["jsonp"] is not None:
38                jsonp = args["jsonp"]
39            else:
40                jsonp = "jsonp"
41        newstream = stream.ProducerStream()
42        self.streams.append((newstream,jsonp))
43        amsg = []
44        if len(self.streams) <= len(self.message_queue):
45            amsg = self.message_queue.pop() 
46        if addMessage: amsg.insert(0,addMessage)
47        self.message_queue.append(amsg)
48        print "Created Stream!!"
49        return makeResponse(newstream)
50       
51   
52    def connect(self, message, args):
53        print "message connect  %s" % message
54        msg = {
55            "channel":   "/meta/connect",
56            "successful":True,
57            "error":     None,
58            "clientId":  message["clientId"],
59            "advice":    {"reconnect": "retry"}
60        }
61        str = self.addStream(message, args, msg)
62        if len(self.message_queue[0]) > 1:
63            self.deliver()
64        return str
65   
66    def flushStreams(self):
67        while len(self.streams):
68            self.deliver()
69    def deliver(self, message = None):
70        #FIXME we might need handling for 0 len messages
71        if message:
72            if len(self.message_queue) == 0:
73                self.message_queue.append([message])
74            else:
75                self.message_queue[0].append(message)
76       
77        if len(self.streams) and len(self.message_queue[0]):
78            stream,jsoncallback = self.streams.pop(0)
79            print "delivering %s" % ( "%s(%s)" % (jsoncallback,
80                                      encodejson(self.message_queue[0])))
81            stream.write( "%s(%s)" % (jsoncallback,
82                                      encodejson(self.message_queue.pop(0))))
83           
84            stream.finish()
85            del stream
86        else:
87            "might be delivering eventually %s" % message
88
89class ClientSubscription(object):
90   
91
92    def __init__( self, channel, sendMessageCallback ):
93        self.channel = channel
94        self.sendMessageCallback = sendMessageCallback #Make sure we set this!!!
95
96    def messageReceived(self, message):
97        self.sendMessage(message)
98   
99    #Make sure you call this first if you override this message!!!
100    def sendMessage(self, message):
101        self.sendMessageCallback(message, self.channel)
102   
103    #Make sure you call this first if you override this message!!!
104    def subscribe(self):
105        pass
106   
107    #when we get disconnected or unsubscribed
108    def unsubscribe(self):
109        pass
110
111class Client(object):
112    def __init__(self,
113                 id=None,
114                 ClientSubscriptionType = ClientSubscription):
115        self.ClientSubscriptionType = ClientSubscriptionType
116        self.connection = Connection()
117        self.subscriptions = {}
118        if id:
119            self.id = id
120        else:
121            self.id = Utils.makeId()
122           
123    def connect(self,msg,args):
124        return self.connection.connect(msg, args)
125       
126   
127   
128    def subscribe(self,msg,args):
129        channel = msg["subscription"]
130       
131        if channel not in self.subscriptions:
132            self.subscriptions[channel] = \
133                self.ClientSubscriptionType(channel,self.sendMessage)
134            self.subscriptions[channel].subscribe()   
135            nmsg = {
136                "channel":     "/meta/subscribe",
137                "subscription":channel,
138                "successful":  True
139            }
140            return self._subunsubscribe(msg, args, nmsg)
141        else:
142            return {"error":"already subscribed"}
143       
144    def unsubscribe(self,msg,args):
145        channel = msg["subscription"]
146        if channel in self.subscriptions:
147            self.subscriptions[channel].unsubscribe()
148            del self.subscriptions[channel]
149            nmsg = {
150                "channel":     "/meta/unsubscribe",
151                "subscription":channel,
152                "successful":  True
153            }
154            return self._subunsubscribe(msg, args, nmsg)
155        else:
156            return {"error":"not even subscribed"}
157   
158    def dispatch(self,msg,args):
159        channel = msg["channel"]
160        data = msg["data"]
161        if channel in self.subscriptions:
162            #conn = self.connection.addStream(msg, args)
163            self.subscriptions[channel].messageReceived(data) 
164            return {"successful":True,"channel":channel}
165        else:
166            return {"error":"not subscribed to %s" % channel}       
167       
168    def _subunsubscribe(self,msg,args,nmsg):
169        self.connection.flushStreams()
170        #stream = self.connection.addStream(msg, args,nmsg)
171        #self.connection.deliver()
172        return nmsg
173   
174   
175    def sendMessage(self, message, channel):
176        if self.connection is not None:
177            self.connection.deliver({"data":message, "channel":channel, "clientId":self.id})
178        else:
179            #FIXME make own exception
180            raise Exception("No connection? :(")
181       
182       
183           
184       
185def makeResponse(stream, type="text/javascript", headers={}, code=200):
186    newHeaders = http_headers.Headers()
187    newHeaders.addRawHeader("Content-type", type)
188    for (k,v) in headers.items(): newHeaders.addRawHeader(k, v)
189    print "RESPONSE STREAM: %s" % stream
190    return http.Response(code=code, headers=newHeaders, stream=stream)
191   
192class ucomet(resource.PostableResource):
193    version = 1.0
194    min_version = 1.0
195    def __init__(self, ClientType = Client,
196                 ClientSubscriptionType = ClientSubscription):
197        self.clients = {}
198        self.ClientType = ClientType
199        #we will propogate this to the clients
200        self.ClientSubscriptionType = ClientSubscriptionType
201   
202    def render(self, rcx):
203        args = rcx.args
204        messages = None
205        if "message" in args:
206            try:
207                messages = decodejson( args["message"][0] )
208            except cjson.DecodeError:
209                return makeResponse("Message must be in proper JSON",
210                                   type="text/plain", code=HTTP_ERROR)
211        else:
212            return makeResponse("There's no message argument",
213                              type="text/plain", code=HTTP_ERROR)
214           
215        if type(messages) is not list:
216            return makeResponse("Messages must be encapsulated in array",
217                              type="text/plain", code=HTTP_ERROR)
218        print "messages %s" % messages
219        stream = None
220        responses = []
221        connection = None
222        for m in messages:
223           
224            clientId = None
225            client = None
226            #Let's see if we already have a client
227            if "clientId" in m:
228                clientId = m["clientId"]
229                if clientId in self.clients:
230                    client = self.clients[clientId]
231                    connection = client.connection #it still may be None
232           
233            if type(m) is not dict:
234                responses.append({"error":"each message must be in dict format"})
235                print("NOT DICT")
236                continue
237           
238            if "channel" not in m:
239                responses.append({"error":"channel is required in message"})
240                print("NO CHANNEL")
241                continue
242           
243            channel = m["channel"]
244           
245            if channel == "/meta/handshake":
246                resp = self.handshake(m,args)
247                return resp
248           
249           
250           
251           
252            if channel == "/meta/connect" or \
253               channel == "/meta/reconnect":
254                print "M %s" %m
255                stream = client.connect(m,args)
256            elif channel == "/meta/subscribe":
257                responses.append(client.subscribe(m,args))
258            elif channel == "/meta/unsubscribe":
259                responses.append(client.unsubscribe(m,args))
260            else: 
261                responses.append(client.dispatch(m,args))
262       
263        if not stream or not client:
264            return makeResponse(encodejson(responses))
265        else:
266            for r in responses:
267                connection.deliver(r)
268       
269        return stream
270           
271           
272       
273    def handshake(self, message, args):
274        #FIXME add error handlign if we're already hand shaken
275       
276        client = self.ClientType(ClientSubscriptionType=self.ClientSubscriptionType)
277        self.clients[client.id] = client
278        return makeResponse(encodejson([{
279            "channel"                 :"/meta/handshake",
280            "version"                 :self.version,
281            "minimumVersion"          :self.min_version,
282            "supportedConnectionTypes":Connection.supported_connection_types,
283            "clientId"                :client.id,
284            "successful"              :True,
285            "error"                   :None,
286            "id"                      :message["id"]
287        }]))
288   
289           
290           
291           
292           
293   
294   
Note: See TracBrowser for help on using the browser.