| 1 | from twisted.web2 import resource, http, http_headers, stream |
|---|
| 2 | import random |
|---|
| 3 | import string |
|---|
| 4 | import cjson |
|---|
| 5 | import time |
|---|
| 6 | |
|---|
| 7 | encodejson = cjson.encode |
|---|
| 8 | decodejson = cjson.decode |
|---|
| 9 | |
|---|
| 10 | HTTP_ERROR = 406 |
|---|
| 11 | |
|---|
| 12 | class Utils: |
|---|
| 13 | usedIds = set() #although highly unlikely, no collisions |
|---|
| 14 | usedIds.add(None) |
|---|
| 15 | @staticmethod |
|---|
| 16 | def makeId(chars=32): |
|---|
| 17 | id = None |
|---|
| 18 | while id in Utils.usedIds: |
|---|
| 19 | id = ''.join(random.sample(string.letters+string.digits, chars)) |
|---|
| 20 | Utils.usedIds.add(id) |
|---|
| 21 | return id |
|---|
| 22 | @staticmethod |
|---|
| 23 | def getTime(): |
|---|
| 24 | return time.strftime("%Y-%m-%dT%H:%M:%S.00", time.gmtime()) |
|---|
| 25 | |
|---|
| 26 | class Connection(object): |
|---|
| 27 | supported_connection_types = ["callback-polling", "long-polling"] |
|---|
| 28 | |
|---|
| 29 | def __init__(self): |
|---|
| 30 | self.n_connections = 0 |
|---|
| 31 | self.streams = [] #make sure these are with (stream, jsoncallbac) |
|---|
| 32 | self.message_queue = [] #array of arrays |
|---|
| 33 | |
|---|
| 34 | def addStream(self, message, args, addMessage = None): |
|---|
| 35 | jsonp = "" |
|---|
| 36 | if jsonp in args: |
|---|
| 37 | if args["jsonp"] is not None: |
|---|
| 38 | jsonp = args["jsonp"] |
|---|
| 39 | else: |
|---|
| 40 | jsonp = "jsonp" |
|---|
| 41 | newstream = stream.ProducerStream() |
|---|
| 42 | self.streams.append((newstream,jsonp)) |
|---|
| 43 | amsg = [] |
|---|
| 44 | if len(self.streams) <= len(self.message_queue): |
|---|
| 45 | amsg = self.message_queue.pop() |
|---|
| 46 | if addMessage: amsg.insert(0,addMessage) |
|---|
| 47 | self.message_queue.append(amsg) |
|---|
| 48 | print "Created Stream!!" |
|---|
| 49 | return makeResponse(newstream) |
|---|
| 50 | |
|---|
| 51 | |
|---|
| 52 | def connect(self, message, args): |
|---|
| 53 | print "message connect %s" % message |
|---|
| 54 | msg = { |
|---|
| 55 | "channel": "/meta/connect", |
|---|
| 56 | "successful":True, |
|---|
| 57 | "error": None, |
|---|
| 58 | "clientId": message["clientId"], |
|---|
| 59 | "advice": {"reconnect": "retry"} |
|---|
| 60 | } |
|---|
| 61 | str = self.addStream(message, args, msg) |
|---|
| 62 | if len(self.message_queue[0]) > 1: |
|---|
| 63 | self.deliver() |
|---|
| 64 | return str |
|---|
| 65 | |
|---|
| 66 | def flushStreams(self): |
|---|
| 67 | while len(self.streams): |
|---|
| 68 | self.deliver() |
|---|
| 69 | def deliver(self, message = None): |
|---|
| 70 | #FIXME we might need handling for 0 len messages |
|---|
| 71 | if message: |
|---|
| 72 | if len(self.message_queue) == 0: |
|---|
| 73 | self.message_queue.append([message]) |
|---|
| 74 | else: |
|---|
| 75 | self.message_queue[0].append(message) |
|---|
| 76 | |
|---|
| 77 | if len(self.streams) and len(self.message_queue[0]): |
|---|
| 78 | stream,jsoncallback = self.streams.pop(0) |
|---|
| 79 | print "delivering %s" % ( "%s(%s)" % (jsoncallback, |
|---|
| 80 | encodejson(self.message_queue[0]))) |
|---|
| 81 | stream.write( "%s(%s)" % (jsoncallback, |
|---|
| 82 | encodejson(self.message_queue.pop(0)))) |
|---|
| 83 | |
|---|
| 84 | stream.finish() |
|---|
| 85 | del stream |
|---|
| 86 | else: |
|---|
| 87 | "might be delivering eventually %s" % message |
|---|
| 88 | |
|---|
| 89 | class ClientSubscription(object): |
|---|
| 90 | |
|---|
| 91 | |
|---|
| 92 | def __init__( self, channel, sendMessageCallback ): |
|---|
| 93 | self.channel = channel |
|---|
| 94 | self.sendMessageCallback = sendMessageCallback #Make sure we set this!!! |
|---|
| 95 | |
|---|
| 96 | def messageReceived(self, message): |
|---|
| 97 | self.sendMessage(message) |
|---|
| 98 | |
|---|
| 99 | #Make sure you call this first if you override this message!!! |
|---|
| 100 | def sendMessage(self, message): |
|---|
| 101 | self.sendMessageCallback(message, self.channel) |
|---|
| 102 | |
|---|
| 103 | #Make sure you call this first if you override this message!!! |
|---|
| 104 | def subscribe(self): |
|---|
| 105 | pass |
|---|
| 106 | |
|---|
| 107 | #when we get disconnected or unsubscribed |
|---|
| 108 | def unsubscribe(self): |
|---|
| 109 | pass |
|---|
| 110 | |
|---|
| 111 | class Client(object): |
|---|
| 112 | def __init__(self, |
|---|
| 113 | id=None, |
|---|
| 114 | ClientSubscriptionType = ClientSubscription): |
|---|
| 115 | self.ClientSubscriptionType = ClientSubscriptionType |
|---|
| 116 | self.connection = Connection() |
|---|
| 117 | self.subscriptions = {} |
|---|
| 118 | if id: |
|---|
| 119 | self.id = id |
|---|
| 120 | else: |
|---|
| 121 | self.id = Utils.makeId() |
|---|
| 122 | |
|---|
| 123 | def connect(self,msg,args): |
|---|
| 124 | return self.connection.connect(msg, args) |
|---|
| 125 | |
|---|
| 126 | |
|---|
| 127 | |
|---|
| 128 | def subscribe(self,msg,args): |
|---|
| 129 | channel = msg["subscription"] |
|---|
| 130 | |
|---|
| 131 | if channel not in self.subscriptions: |
|---|
| 132 | self.subscriptions[channel] = \ |
|---|
| 133 | self.ClientSubscriptionType(channel,self.sendMessage) |
|---|
| 134 | self.subscriptions[channel].subscribe() |
|---|
| 135 | nmsg = { |
|---|
| 136 | "channel": "/meta/subscribe", |
|---|
| 137 | "subscription":channel, |
|---|
| 138 | "successful": True |
|---|
| 139 | } |
|---|
| 140 | return self._subunsubscribe(msg, args, nmsg) |
|---|
| 141 | else: |
|---|
| 142 | return {"error":"already subscribed"} |
|---|
| 143 | |
|---|
| 144 | def unsubscribe(self,msg,args): |
|---|
| 145 | channel = msg["subscription"] |
|---|
| 146 | if channel in self.subscriptions: |
|---|
| 147 | self.subscriptions[channel].unsubscribe() |
|---|
| 148 | del self.subscriptions[channel] |
|---|
| 149 | nmsg = { |
|---|
| 150 | "channel": "/meta/unsubscribe", |
|---|
| 151 | "subscription":channel, |
|---|
| 152 | "successful": True |
|---|
| 153 | } |
|---|
| 154 | return self._subunsubscribe(msg, args, nmsg) |
|---|
| 155 | else: |
|---|
| 156 | return {"error":"not even subscribed"} |
|---|
| 157 | |
|---|
| 158 | def dispatch(self,msg,args): |
|---|
| 159 | channel = msg["channel"] |
|---|
| 160 | data = msg["data"] |
|---|
| 161 | if channel in self.subscriptions: |
|---|
| 162 | #conn = self.connection.addStream(msg, args) |
|---|
| 163 | self.subscriptions[channel].messageReceived(data) |
|---|
| 164 | return {"successful":True,"channel":channel} |
|---|
| 165 | else: |
|---|
| 166 | return {"error":"not subscribed to %s" % channel} |
|---|
| 167 | |
|---|
| 168 | def _subunsubscribe(self,msg,args,nmsg): |
|---|
| 169 | self.connection.flushStreams() |
|---|
| 170 | #stream = self.connection.addStream(msg, args,nmsg) |
|---|
| 171 | #self.connection.deliver() |
|---|
| 172 | return nmsg |
|---|
| 173 | |
|---|
| 174 | |
|---|
| 175 | def sendMessage(self, message, channel): |
|---|
| 176 | if self.connection is not None: |
|---|
| 177 | self.connection.deliver({"data":message, "channel":channel, "clientId":self.id}) |
|---|
| 178 | else: |
|---|
| 179 | #FIXME make own exception |
|---|
| 180 | raise Exception("No connection? :(") |
|---|
| 181 | |
|---|
| 182 | |
|---|
| 183 | |
|---|
| 184 | |
|---|
| 185 | def makeResponse(stream, type="text/javascript", headers={}, code=200): |
|---|
| 186 | newHeaders = http_headers.Headers() |
|---|
| 187 | newHeaders.addRawHeader("Content-type", type) |
|---|
| 188 | for (k,v) in headers.items(): newHeaders.addRawHeader(k, v) |
|---|
| 189 | print "RESPONSE STREAM: %s" % stream |
|---|
| 190 | return http.Response(code=code, headers=newHeaders, stream=stream) |
|---|
| 191 | |
|---|
| 192 | class ucomet(resource.PostableResource): |
|---|
| 193 | version = 1.0 |
|---|
| 194 | min_version = 1.0 |
|---|
| 195 | def __init__(self, ClientType = Client, |
|---|
| 196 | ClientSubscriptionType = ClientSubscription): |
|---|
| 197 | self.clients = {} |
|---|
| 198 | self.ClientType = ClientType |
|---|
| 199 | #we will propogate this to the clients |
|---|
| 200 | self.ClientSubscriptionType = ClientSubscriptionType |
|---|
| 201 | |
|---|
| 202 | def render(self, rcx): |
|---|
| 203 | args = rcx.args |
|---|
| 204 | messages = None |
|---|
| 205 | if "message" in args: |
|---|
| 206 | try: |
|---|
| 207 | messages = decodejson( args["message"][0] ) |
|---|
| 208 | except cjson.DecodeError: |
|---|
| 209 | return makeResponse("Message must be in proper JSON", |
|---|
| 210 | type="text/plain", code=HTTP_ERROR) |
|---|
| 211 | else: |
|---|
| 212 | return makeResponse("There's no message argument", |
|---|
| 213 | type="text/plain", code=HTTP_ERROR) |
|---|
| 214 | |
|---|
| 215 | if type(messages) is not list: |
|---|
| 216 | return makeResponse("Messages must be encapsulated in array", |
|---|
| 217 | type="text/plain", code=HTTP_ERROR) |
|---|
| 218 | print "messages %s" % messages |
|---|
| 219 | stream = None |
|---|
| 220 | responses = [] |
|---|
| 221 | connection = None |
|---|
| 222 | for m in messages: |
|---|
| 223 | |
|---|
| 224 | clientId = None |
|---|
| 225 | client = None |
|---|
| 226 | #Let's see if we already have a client |
|---|
| 227 | if "clientId" in m: |
|---|
| 228 | clientId = m["clientId"] |
|---|
| 229 | if clientId in self.clients: |
|---|
| 230 | client = self.clients[clientId] |
|---|
| 231 | connection = client.connection #it still may be None |
|---|
| 232 | |
|---|
| 233 | if type(m) is not dict: |
|---|
| 234 | responses.append({"error":"each message must be in dict format"}) |
|---|
| 235 | print("NOT DICT") |
|---|
| 236 | continue |
|---|
| 237 | |
|---|
| 238 | if "channel" not in m: |
|---|
| 239 | responses.append({"error":"channel is required in message"}) |
|---|
| 240 | print("NO CHANNEL") |
|---|
| 241 | continue |
|---|
| 242 | |
|---|
| 243 | channel = m["channel"] |
|---|
| 244 | |
|---|
| 245 | if channel == "/meta/handshake": |
|---|
| 246 | resp = self.handshake(m,args) |
|---|
| 247 | return resp |
|---|
| 248 | |
|---|
| 249 | |
|---|
| 250 | |
|---|
| 251 | |
|---|
| 252 | if channel == "/meta/connect" or \ |
|---|
| 253 | channel == "/meta/reconnect": |
|---|
| 254 | print "M %s" %m |
|---|
| 255 | stream = client.connect(m,args) |
|---|
| 256 | elif channel == "/meta/subscribe": |
|---|
| 257 | responses.append(client.subscribe(m,args)) |
|---|
| 258 | elif channel == "/meta/unsubscribe": |
|---|
| 259 | responses.append(client.unsubscribe(m,args)) |
|---|
| 260 | else: |
|---|
| 261 | responses.append(client.dispatch(m,args)) |
|---|
| 262 | |
|---|
| 263 | if not stream or not client: |
|---|
| 264 | return makeResponse(encodejson(responses)) |
|---|
| 265 | else: |
|---|
| 266 | for r in responses: |
|---|
| 267 | connection.deliver(r) |
|---|
| 268 | |
|---|
| 269 | return stream |
|---|
| 270 | |
|---|
| 271 | |
|---|
| 272 | |
|---|
| 273 | def handshake(self, message, args): |
|---|
| 274 | #FIXME add error handlign if we're already hand shaken |
|---|
| 275 | |
|---|
| 276 | client = self.ClientType(ClientSubscriptionType=self.ClientSubscriptionType) |
|---|
| 277 | self.clients[client.id] = client |
|---|
| 278 | return makeResponse(encodejson([{ |
|---|
| 279 | "channel" :"/meta/handshake", |
|---|
| 280 | "version" :self.version, |
|---|
| 281 | "minimumVersion" :self.min_version, |
|---|
| 282 | "supportedConnectionTypes":Connection.supported_connection_types, |
|---|
| 283 | "clientId" :client.id, |
|---|
| 284 | "successful" :True, |
|---|
| 285 | "error" :None, |
|---|
| 286 | "id" :message["id"] |
|---|
| 287 | }])) |
|---|
| 288 | |
|---|
| 289 | |
|---|
| 290 | |
|---|
| 291 | |
|---|
| 292 | |
|---|
| 293 | |
|---|
| 294 | |
|---|