root/xcap/interfaces/backend/sipthor.py

Revision 717, 18.8 KB (checked in by Saul Ibarra <saul@ag-projects.com>, 4 days ago)

Fixed PUT with If-Match header

Line 
1
2# Copyright (C) 2007-2010 AG-Projects.
3#
4# This module is proprietary to AG Projects. Use of this module by third
5# parties is not supported.
6
7import signal
8
9import cjson
10
11from formencode import validators
12
13from application import log
14from application.python.util import Singleton
15from application.system import default_host_ip
16from application.process import process
17from application.configuration import ConfigSection, ConfigSetting
18
19from sqlobject import sqlhub, connectionForURI, SQLObject, AND
20from sqlobject import StringCol, IntCol, DateTimeCol, SOBLOBCol, Col
21from sqlobject import MultipleJoin, ForeignKey
22
23from zope.interface import implements
24from twisted.internet import reactor
25from twisted.internet import defer
26from twisted.internet.defer import Deferred, maybeDeferred
27from twisted.cred.checkers import ICredentialsChecker
28from twisted.cred.credentials import IUsernamePassword, IUsernameHashedPassword
29from twisted.cred.error import UnauthorizedLogin
30
31from thor.link import ControlLink, Notification, Request
32from thor.eventservice import EventServiceClient, ThorEvent
33from thor.entities import ThorEntitiesRoleMap, GenericThorEntity as ThorEntity
34
35from gnutls.interfaces.twisted import X509Credentials
36from gnutls.constants import COMP_DEFLATE, COMP_LZO, COMP_NULL
37
38import xcap
39from xcap.tls import Certificate, PrivateKey
40from xcap.interfaces.backend import StatusResponse
41from xcap.dbutil import make_random_etag
42
43
44class ThorNodeConfig(ConfigSection):
45    __cfgfile__ = xcap.__cfgfile__
46    __section__ = 'ThorNetwork'
47
48    domain = "sipthor.net"
49    multiply = 1000
50    certificate = ConfigSetting(type=Certificate, value=None)
51    private_key = ConfigSetting(type=PrivateKey, value=None)
52    ca = ConfigSetting(type=Certificate, value=None)
53
54
55
56class JSONValidator(validators.Validator):
57
58    def to_python(self, value, state):
59        if value is None:
60            return None
61        try:
62            return cjson.decode(value)
63        except Exception:
64            raise validators.Invalid("expected a decodable JSON object in the JSONCol '%s', got %s %r instead" % (self.name, type(value), value), value, state)
65
66    def from_python(self, value, state):
67        if value is None:
68            return None
69        try:
70            return cjson.encode(value)
71        except Exception:
72            raise validators.Invalid("expected an encodable JSON object in the JSONCol '%s', got %s %r instead" % (self.name, type(value), value), value, state)
73
74
75class SOJSONCol(SOBLOBCol):
76
77    def createValidators(self):
78        return [JSONValidator()] + super(SOJSONCol, self).createValidators()
79
80
81class JSONCol(Col):
82    baseClass = SOJSONCol
83
84
85class SipAccount(SQLObject):
86    class sqlmeta:
87        table = 'sip_accounts_meta'
88    username   = StringCol(length=64)
89    domain     = StringCol(length=64)
90    firstName  = StringCol(length=64)
91    lastName   = StringCol(length=64)
92    email      = StringCol(length=64)
93    customerId = IntCol(default=0)
94    resellerId = IntCol(default=0)
95    ownerId    = IntCol(default=0)
96    changeDate = DateTimeCol(default=DateTimeCol.now)
97    ## joins
98    data       = MultipleJoin('SipAccountData', joinColumn='account_id')
99
100    def _set_profile(self, value):
101        data = list(self.data)
102        if not data:
103            SipAccountData(account=self, profile=value)
104        else:
105            data[0].profile = value
106
107    def _get_profile(self):
108        return self.data[0].profile
109
110    def set(self, **kwargs):
111        kwargs = kwargs.copy()
112        profile = kwargs.pop('profile', None)
113        SQLObject.set(self, **kwargs)
114        if profile is not None:
115            self._set_profile(profile)
116
117
118class SipAccountData(SQLObject):
119    class sqlmeta:
120        table = 'sip_accounts_data'
121    account  = ForeignKey('SipAccount', cascade=True)
122    profile  = JSONCol()
123
124
125def sanitize_application_id(application_id):
126    if application_id == "org.openmobilealliance.pres-rules":
127        return "pres-rules"
128    else:
129        return application_id
130
131class ThorEntityAddress(str):
132    def __new__(cls, ip, control_port=None, version='unknown'):
133        instance = str.__new__(cls, ip)
134        instance.ip = ip
135        instance.version = version
136        instance.control_port = control_port
137        return instance
138
139
140class GetSIPWatchers(Request):
141    def __new__(cls, account):
142        command = "get sip_watchers for %s" % account
143        instance = Request.__new__(cls, command)
144        return instance
145
146
147class XCAPProvisioning(EventServiceClient):
148    __metaclass__ = Singleton
149    topics = ["Thor.Members"]
150
151    def __init__(self):
152        self._database = DatabaseConnection()
153        self.node = ThorEntity(default_host_ip, ['xcap_server'], version=xcap.__version__)
154        self.networks = {}
155        self.presence_message = ThorEvent('Thor.Presence', self.node.id)
156        self.shutdown_message = ThorEvent('Thor.Leave', self.node.id)
157        credentials = X509Credentials(ThorNodeConfig.certificate, ThorNodeConfig.private_key, [ThorNodeConfig.ca])
158        credentials.verify_peer = True
159        credentials.session_params.compressions = (COMP_LZO, COMP_DEFLATE, COMP_NULL)
160        self.control = ControlLink(credentials)
161        EventServiceClient.__init__(self, ThorNodeConfig.domain, credentials)
162        process.signals.add_handler(signal.SIGHUP, self._handle_SIGHUP)
163        process.signals.add_handler(signal.SIGINT, self._handle_SIGINT)
164        process.signals.add_handler(signal.SIGTERM, self._handle_SIGTERM)
165
166    def _disconnect_all(self, result):
167        self.control.disconnect_all()
168        EventServiceClient._disconnect_all(self, result)
169
170    def lookup(self, key):
171        network = self.networks.get("sip_proxy", None)
172        if network is None:
173            return None
174        try:
175            node = network.lookup_node(key)
176        except LookupError:
177            node = None
178        except Exception:
179            log.err()
180            node = None
181        return node
182
183    def notify(self, operation, entity_type, entity):
184        node = self.lookup(entity)
185        if node is not None:
186            if node.control_port is None:
187                log.error("Could not send notify because node %s has no control port" % node.ip)
188                return
189            self.control.send_request(Notification("notify %s %s %s" % (operation, entity_type, entity)), (node.ip, node.control_port))
190
191    def get_watchers(self, key):
192        node = self.lookup(key)
193        if node is None:
194            return defer.fail("no nodes found when searching for key %s" % str(key))
195        if node.control_port is None:
196            return defer.fail("could not send notify because node %s has no control port" % node.ip)
197        request = GetSIPWatchers(key)
198        request.deferred = Deferred()
199        self.control.send_request(request, (node.ip, node.control_port))
200        return request.deferred
201
202    def handle_event(self, event):
203        # print "Received event: %s" % event
204        networks = self.networks
205        role_map = ThorEntitiesRoleMap(event.message) ## mapping between role names and lists of nodes with that role
206        thor_databases = role_map.get('thor_database', [])
207        if thor_databases:
208            thor_databases.sort(lambda x, y: cmp(x.priority, y.priority) or cmp(x.ip, y.ip))
209            dburi = thor_databases[0].dburi
210        else:
211            dburi = None
212        self._database.update_dburi(dburi)
213        all_roles = role_map.keys() + networks.keys()
214        for role in all_roles:
215            try:
216                network = networks[role] ## avoid setdefault here because it always evaluates the 2nd argument
217            except KeyError:
218                from thor import network as thor_network
219                if role in ["thor_manager", "thor_monitor", "provisioning_server", "media_relay", "thor_database"]:
220                    continue
221                else:
222                    network = thor_network.new(ThorNodeConfig.multiply)
223                networks[role] = network
224            new_nodes = set([ThorEntityAddress(node.ip, getattr(node, 'control_port', None), getattr(node, 'version', 'unknown')) for node in role_map.get(role, [])])
225            old_nodes = set(network.nodes)
226            ## compute set differences
227            added_nodes = new_nodes - old_nodes
228            removed_nodes = old_nodes - new_nodes
229            if removed_nodes:
230                for node in removed_nodes:
231                    network.remove_node(node)
232                    self.control.discard_link((node.ip, node.control_port))
233                plural = len(removed_nodes) != 1 and 's' or ''
234                log.msg("removed %s node%s: %s" % (role, plural, ', '.join(removed_nodes)))
235            if added_nodes:
236                for node in added_nodes:
237                    network.add_node(node)
238                plural = len(added_nodes) != 1 and 's' or ''
239                log.msg("added %s node%s: %s" % (role, plural, ', '.join(added_nodes)))
240            #print "Thor %s nodes: %s" % (role, str(network.nodes))
241
242
243class NotFound(Exception):
244    pass
245
246
247class NoDatabase(Exception):
248    pass
249
250
251class DatabaseConnection(object):
252    __metaclass__ = Singleton
253
254    def __init__(self):
255        self.dburi = None
256
257    # Methods to be called from the Twisted thread:
258    def put(self, uri, document, check_etag, new_etag):
259        defer = Deferred()
260        operation = lambda profile: self._put_operation(uri, document, check_etag, new_etag, profile)
261        reactor.callInThread(self.retrieve_profile, uri.user.username, uri.user.domain, operation, True, defer)
262        return defer
263
264    def delete(self, uri, check_etag):
265        defer = Deferred()
266        operation = lambda profile: self._delete_operation(uri, check_etag, profile)
267        reactor.callInThread(self.retrieve_profile, uri.user.username, uri.user.domain, operation, True, defer)
268        return defer
269
270    def get(self, uri):
271        defer = Deferred()
272        operation = lambda profile: self._get_operation(uri, profile)
273        reactor.callInThread(self.retrieve_profile, uri.user.username, uri.user.domain, operation, False, defer)
274        return defer
275
276    def get_profile(self, username, domain):
277        defer = Deferred()
278        reactor.callInThread(self.retrieve_profile, username, domain, lambda profile: profile, False, defer)
279        return defer
280
281    def get_documents_list(self, uri):
282        defer = Deferred()
283        operation = lambda profile: self._get_documents_list_operation(uri, profile)
284        reactor.callInThread(self.retrieve_profile, uri.user.username, uri.user.domain, operation, False, defer)
285        return defer
286
287
288    # Methods to be called in a separate thread:
289    def _put_operation(self, uri, document, check_etag, new_etag, profile):
290        application_id = sanitize_application_id(uri.application_id)
291        xcap_docs = profile.setdefault("xcap", {})
292        try:
293            etag = xcap_docs[application_id][uri.doc_selector.document_path][1]
294        except KeyError:
295            found = False
296            check_etag(None, False)
297        else:
298            found = True
299            check_etag(etag)
300        xcap_app = xcap_docs.setdefault(application_id, {})
301        xcap_app[uri.doc_selector.document_path] = (document, new_etag)
302        return found
303
304    def _delete_operation(self, uri, check_etag, profile):
305        application_id = sanitize_application_id(uri.application_id)
306        xcap_docs = profile.setdefault("xcap", {})
307        try:
308            etag = xcap_docs[application_id][uri.doc_selector.document_path][1]
309        except KeyError:
310            raise NotFound()
311        check_etag(etag)
312        del(xcap_docs[application_id][uri.doc_selector.document_path])
313        return None
314
315    def _get_operation(self, uri, profile):
316        try:
317            xcap_docs = profile["xcap"]
318            doc, etag = xcap_docs[sanitize_application_id(uri.application_id)][uri.doc_selector.document_path]
319        except KeyError:
320            raise NotFound()
321        return doc, etag
322
323    def _get_documents_list_operation(self, uri, profile):
324        try:
325            xcap_docs = profile["xcap"]
326        except KeyError:
327            raise NotFound()
328        return xcap_docs
329
330    def retrieve_profile(self, username, domain, operation, update, defer):
331        transaction = None
332        try:
333            if self.dburi is None:
334                raise NoDatabase()
335            transaction = sqlhub.processConnection.transaction()
336            try:
337                db_account = SipAccount.select(AND(SipAccount.q.username == username, SipAccount.q.domain == domain), connection = transaction, forUpdate = update)[0]
338            except IndexError:
339                raise NotFound()
340            profile = db_account.profile
341            result = operation(profile) # NB: may modify profile!
342            if update:
343                db_account.profile = profile
344            transaction.commit(close=True)
345        except Exception, e:
346            if transaction:
347                transaction.rollback()
348            reactor.callFromThread(defer.errback, e)
349        else:
350            reactor.callFromThread(defer.callback, result)
351
352    def update_dburi(self, dburi):
353        if self.dburi != dburi:
354            if self.dburi is not None:
355                sqlhub.processConnection.close()
356            if dburi is None:
357                sqlhub.processConnection
358            else:
359                sqlhub.processConnection = connectionForURI(dburi)
360            self.dburi = dburi
361
362
363class SipthorPasswordChecker(object):
364    implements(ICredentialsChecker)
365    credentialInterfaces = (IUsernamePassword, IUsernameHashedPassword)
366
367    def __init__(self):
368        self._database = DatabaseConnection()
369
370    def _query_credentials(self, credentials):
371        username, domain = credentials.username.split('@', 1)[0], credentials.realm
372        result = self._database.get_profile(username, domain)
373        result.addCallback(self._got_query_results, credentials)
374        result.addErrback(self._got_unsuccessfull)
375        return result
376
377    def _got_unsuccessfull(self, failure):
378        failure.trap(NotFound)
379        raise UnauthorizedLogin("Unauthorized login")
380
381    def _got_query_results(self, profile, credentials):
382        return self._authenticate_credentials(profile, credentials)
383
384    def _authenticate_credentials(self, profile, credentials):
385        raise NotImplementedError
386
387    def _checkedPassword(self, matched, username, realm):
388        if matched:
389            username = username.split('@', 1)[0]
390            ## this is the avatar ID
391            return "%s@%s" % (username, realm)
392        else:
393            raise UnauthorizedLogin("Unauthorized login")
394
395    def requestAvatarId(self, credentials):
396        """Return the avatar ID for the credentials which must have the username
397           and realm attributes, or an UnauthorizedLogin in case of a failure."""
398        d = self._query_credentials(credentials)
399        return d
400
401
402class PlainPasswordChecker(SipthorPasswordChecker):
403    """A credentials checker against a database subscriber table, where the passwords
404       are stored in plain text."""
405
406    implements(ICredentialsChecker)
407
408    def _authenticate_credentials(self, profile, credentials):
409        return maybeDeferred(
410                credentials.checkPassword, profile["password"]).addCallback(
411                self._checkedPassword, credentials.username, credentials.realm)
412
413
414class HashPasswordChecker(SipthorPasswordChecker):
415    """A credentials checker against a database subscriber table, where the passwords
416       are stored as MD5 hashes."""
417
418    implements(ICredentialsChecker)
419
420    def _authenticate_credentials(self, profile, credentials):
421        return maybeDeferred(
422                credentials.checkHash, profile["ha1"]).addCallback(
423                self._checkedPassword, credentials.username, credentials.realm)
424
425class Storage(object):
426    __metaclass__ = Singleton
427
428    def __init__(self):
429        self._database = DatabaseConnection()
430        self._provisioning = XCAPProvisioning()
431
432    def _normalize_document_path(self, uri):
433        if uri.application_id in ("pres-rules", "org.openmobilealliance.pres-rules"):
434            # some clients e.g. counterpath's eyebeam save presence rules under
435            # different filenames between versions and they expect to find the same
436            # information, thus we are forcing all presence rules documents to be
437            # saved under "index.xml" default filename
438            uri.doc_selector.document_path = "index.xml"
439
440    def get_document(self, uri, check_etag):
441        self._normalize_document_path(uri)
442        result = self._database.get(uri)
443        result.addCallback(self._got_document, check_etag)
444        result.addErrback(self._eb_not_found)
445        return result
446
447    def _eb_not_found(self, failure):
448        failure.trap(NotFound)
449        return StatusResponse(404)
450
451    def _got_document(self, (doc, etag), check_etag):
452        doc = doc.encode('utf-8')
453        check_etag(etag)
454        return StatusResponse(200, etag, doc)
455
456    def put_document(self, uri, document, check_etag):
457        document = document.decode('utf-8')
458        self._normalize_document_path(uri)
459        etag = make_random_etag(uri)
460        result = self._database.put(uri, document, check_etag, etag)
461        result.addCallback(self._cb_put, etag, "%s@%s" % (uri.user.username, uri.user.domain))
462        result.addErrback(self._eb_not_found)
463        return result
464
465    def _cb_put(self, found, etag, thor_key):
466        if found:
467            code = 200
468        else:
469            code = 201
470        self._provisioning.notify("update", "sip_account", thor_key)
471        return StatusResponse(code, etag)
472
473    def delete_document(self, uri, check_etag):
474        self._normalize_document_path(uri)
475        result = self._database.delete(uri, check_etag)
476        result.addCallback(self._cb_delete, "%s@%s" % (uri.user.username, uri.user.domain))
477        result.addErrback(self._eb_not_found)
478        return result
479
480    def _cb_delete(self, nothing, thor_key):
481        self._provisioning.notify("update", "sip_account", thor_key)
482        return StatusResponse(200)
483
484    def get_watchers(self, uri):
485        thor_key = "%s@%s" % (uri.user.username, uri.user.domain)
486        result = self._provisioning.get_watchers(thor_key)
487        result.addCallback(self._get_watchers_decode)
488        return result
489
490    def _get_watchers_decode(self, response):
491        if response.code == 200:
492            watchers = cjson.decode(response.data)
493            for watcher in watchers:
494                watcher["online"] = str(watcher["online"]).lower()
495            return watchers
496        else:
497            print "error: %s" % response
498
499    def get_documents_list(self, uri):
500        result = self._database.get_documents_list(uri)
501        result.addCallback(self._got_documents_list)
502        return result
503
504    def _got_documents_list(self, xcap_docs):
505        docs = {}
506        if xcap_docs:
507            for k, v in xcap_docs.iteritems():
508                for k2, v2 in v.iteritems():
509                    if docs.has_key(k):
510                        docs[k].append((k2, v2[1]))
511                    else:
512                        docs[k] = [(k2, v2[1])]
513        return docs
514
515
516installSignalHandlers = False
Note: See TracBrowser for help on using the browser.