|
| 1 | +from Pyro5.compatibility import Pyro4 |
| 2 | +import Pyro4.core |
| 3 | +import Pyro4.errors |
| 4 | + |
| 5 | + |
| 6 | +Pyro4.config.SSL = True |
| 7 | +Pyro4.config.SSL_CACERTS = "../../certs/server_cert.pem" # to make ssl accept the self-signed server cert |
| 8 | +Pyro4.config.SSL_CLIENTCERT = "../../certs/client_cert.pem" |
| 9 | +Pyro4.config.SSL_CLIENTKEY = "../../certs/client_key.pem" |
| 10 | +print("SSL enabled (2-way).") |
| 11 | + |
| 12 | + |
| 13 | +def verify_cert(cert): |
| 14 | + if not cert: |
| 15 | + raise Pyro4.errors.CommunicationError("cert missing") |
| 16 | + # note: hostname and expiry date validation is already successfully performed by the SSL layer itself |
| 17 | + # not_before = datetime.datetime.utcfromtimestamp(ssl.cert_time_to_seconds(cert["notBefore"])) |
| 18 | + # print("not before:", not_before) |
| 19 | + # not_after = datetime.datetime.utcfromtimestamp(ssl.cert_time_to_seconds(cert["notAfter"])) |
| 20 | + # print("not after:", not_after) |
| 21 | + # today = datetime.datetime.now() |
| 22 | + # if today > not_after or today < not_before: |
| 23 | + # raise Pyro4.errors.CommunicationError("cert not yet valid or expired") |
| 24 | + if cert["serialNumber"] != "8C3AD2A88A0657EF": |
| 25 | + raise Pyro4.errors.CommunicationError("cert serial number incorrect", cert["serialNumber"]) |
| 26 | + issuer = dict(p[0] for p in cert["issuer"]) |
| 27 | + subject = dict(p[0] for p in cert["subject"]) |
| 28 | + if issuer["organizationName"] != "Razorvine.net": |
| 29 | + # issuer is not often relevant I guess, but just to show that you have the data |
| 30 | + raise Pyro4.errors.CommunicationError("cert not issued by Razorvine.net") |
| 31 | + if subject["countryName"] != "NL": |
| 32 | + raise Pyro4.errors.CommunicationError("cert not for country NL") |
| 33 | + if subject["organizationName"] != "Razorvine.net": |
| 34 | + raise Pyro4.errors.CommunicationError("cert not for Razorvine.net") |
| 35 | + print("(SSL server cert is ok: serial={ser}, subject={subj})" |
| 36 | + .format(ser=cert["serialNumber"], subj=subject["organizationName"])) |
| 37 | + |
| 38 | + |
| 39 | +# to make Pyro verify the certificate on new connections, use the handshake mechanism: |
| 40 | +class CertCheckingProxy(Pyro4.Proxy): |
| 41 | + def _pyroValidateHandshake(self, response): |
| 42 | + cert = self._pyroConnection.getpeercert() |
| 43 | + verify_cert(cert) |
| 44 | + |
| 45 | + |
| 46 | +# Note: to automatically enforce certificate verification for all proxy objects you create, |
| 47 | +# you can also monkey-patch the method in the Proxy class itself. |
| 48 | +# Then you don't have to make sure that you're using CertCheckingProxy every time. |
| 49 | +# However some other Proxy subclass can (will) override this again! |
| 50 | +# |
| 51 | +# def certverifier(self, response): |
| 52 | +# cert = self._pyroConnection.getpeercert() |
| 53 | +# verify_cert(cert) |
| 54 | +# Pyro4.core.Proxy._pyroValidateHandshake = certverifier |
| 55 | + |
| 56 | + |
| 57 | +uri = input("Server uri: ").strip() |
| 58 | +with CertCheckingProxy(uri) as p: |
| 59 | + response = p.echo("client speaking") |
| 60 | + print("response:", response) |
0 commit comments