Source code for rinse.client

"""SOAP client."""
from __future__ import print_function
import requests
from rinse import ENVELOPE_XSD
from rinse.util import SCHEMA, cached_property
from rinse.response import RinseResponse


[docs]class SoapClient(object): """Rinse SOAP client.""" def __init__(self, url, debug=False, **kwargs): """Set base attributes.""" self.url = url self.debug = debug self.kwargs = kwargs self.operations = {} self.soap_schema = SCHEMA[ENVELOPE_XSD] @cached_property def _session(self): """Cached instance of requests.Session.""" return requests.Session() def __call__(self, msg, action="", build_response=RinseResponse, debug=False): """Post 'msg' to remote service.""" # generate HTTP request from msg request = msg.request(self.url, action).prepare() if debug or self.debug: print('{} {}'.format(request.method, self.url)) print( ''.join( '{}: {}\n'.format(name, val) for name, val in sorted(request.headers.items()) ) ) print(request.body.decode('utf-8')) # perform HTTP(s) POST resp = self._session.send(request) return build_response(resp)