Package backend :: Package satellite_tools :: Module satCerts
[hide private]
[frames] | no frames]

Source Code for Module backend.satellite_tools.satCerts

  1  # 
  2  # Cert-related functions 
  3  #   - RHN certificate 
  4  #   - SSL CA certificate 
  5  # 
  6  # Copyright (c) 2008--2018 Red Hat, Inc. 
  7  # 
  8  # This software is licensed to you under the GNU General Public License, 
  9  # version 2 (GPLv2). There is NO WARRANTY for this software, express or 
 10  # implied, including the implied warranties of MERCHANTABILITY or FITNESS 
 11  # FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 
 12  # along with this software; if not, see 
 13  # http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. 
 14  # 
 15  # Red Hat trademarks are not licensed under GPLv2. No permission is 
 16  # granted to use or replicate Red Hat trademarks that are incorporated 
 17  # in this software or its documentation. 
 18  # 
 19   
 20  # language imports 
 21  import sys 
 22  from datetime import datetime 
 23   
 24  from M2Crypto import X509 
 25   
 26  # other rhn imports 
 27  from spacewalk.common.rhnLib import utc 
 28  from spacewalk.common.usix import raise_with_tb 
 29  from spacewalk.server import rhnSQL 
 30  from spacewalk.common.rhnTB import fetchTraceback 
 31   
 32  # bare-except and broad-except 
 33  # pylint: disable=W0702,W0703 
 34   
 35   
 36  # Get not before and not after timestamps from given X509 certificate 
37 -def get_certificate_info(cert_str):
38 cert = X509.load_cert_string(cert_str) 39 not_before = cert.get_not_before().get_datetime() 40 not_after = cert.get_not_after().get_datetime() 41 subject = cert.get_subject() 42 cn = subject.CN 43 serial_number = cert.get_serial_number() 44 return cn, serial_number, not_before, not_after
45 46
47 -def verify_certificate_dates(cert_str):
48 _, _, not_before, not_after = get_certificate_info(cert_str) 49 now = datetime.now(utc) 50 return not_before < now < not_after
51 52
53 -def get_all_orgs():
54 """ Fetch org_id. Create first org_id if needed. 55 owner only needed if no org_id present 56 NOTE: this is duplicated elsewhere (backend.py) 57 but I need the error differientiation of (1) too many orgs 58 and (2) no orgs. backend.py does not differientiate. 59 """ 60 61 # Get the org id 62 h = rhnSQL.prepare(_queryLookupOrgId) 63 h.execute() 64 rows = h.fetchall_dict() 65 return rows or []
66 67 68 _queryLookupOrgId = rhnSQL.Statement(""" 69 SELECT id 70 FROM web_customer 71 """) 72 73 # 74 # SSL CA certificate section 75 # 76 77
78 -class CaCertInsertionError(Exception):
79 pass
80 81
82 -def lookup_cert(description, org_id):
83 if org_id: 84 h = rhnSQL.prepare(_querySelectCryptoCertInfo) 85 h.execute(description=description, org_id=org_id) 86 else: 87 h = rhnSQL.prepare(_querySelectCryptoCertInfoNullOrg) 88 h.execute(description=description) 89 90 row = h.fetchone_dict() 91 return row
92
93 -def _checkCertMatch_rhnCryptoKey(cert, description, org_id, deleteRowYN=0, 94 verbosity=0):
95 """ is there an CA SSL certificate already in the database? 96 If yes: 97 return ID: 98 -1, then no cert in DB 99 None if they are identical (i.e., nothing to do) 100 0...N if cert is in database 101 102 if found, optionally deletes the row and returns -1 103 Used ONLY by: store_rhnCryptoKey(...) 104 """ 105 106 row = lookup_cert(description, org_id) 107 rhn_cryptokey_id = -1 108 if row: 109 if cert == rhnSQL.read_lob(row['key']): 110 # match found, nothing to do 111 if verbosity: 112 print("Nothing to do: certificate to be pushed matches certificate in database.") 113 return None 114 # there can only be one (bugzilla: 120297) 115 rhn_cryptokey_id = int(row['id']) 116 # print 'found existing certificate - id:', rhn_cryptokey_id 117 # NUKE IT! 118 if deleteRowYN: 119 # print 'found a cert, nuking it! id:', rhn_cryptokey_id 120 h = rhnSQL.prepare('delete from rhnCryptoKey where id=:rhn_cryptokey_id') 121 h.execute(rhn_cryptokey_id=rhn_cryptokey_id) 122 # rhnSQL.commit() 123 rhn_cryptokey_id = -1 124 return rhn_cryptokey_id
125 126
127 -def _insertPrep_rhnCryptoKey(rhn_cryptokey_id, description, org_id):
128 """ inserts a row given that a cert is not already in the database 129 lob rewrite occurs later during update. 130 Used ONLY by: store_rhnCryptoKey(...) 131 """ 132 133 # NOTE: due to a uniqueness constraint on description 134 # we can't increment and reinsert a row, so we only 135 # do so if the row does not exist. 136 # bugzilla: 120297 - and no I don't like it. 137 rhn_cryptokey_id_seq = rhnSQL.Sequence('rhn_cryptokey_id_seq') 138 rhn_cryptokey_id = rhn_cryptokey_id_seq.next() 139 # print 'no cert found, new one with id:', rhn_cryptokey_id 140 h = rhnSQL.prepare(_queryInsertCryptoCertInfo) 141 # ...insert 142 h.execute(rhn_cryptokey_id=rhn_cryptokey_id, 143 description=description, org_id=org_id) 144 return rhn_cryptokey_id
145 146
147 -def _lobUpdate_rhnCryptoKey(rhn_cryptokey_id, cert):
148 """ writes/updates the cert as a lob """ 149 150 # Use our update blob wrapper to accomodate differences between Oracle 151 # and PostgreSQL: 152 h = rhnSQL.cursor() 153 try: 154 h.update_blob("rhnCryptoKey", "key", "WHERE id = :rhn_cryptokey_id", 155 cert, rhn_cryptokey_id=rhn_cryptokey_id) 156 except: 157 # didn't go in! 158 raise_with_tb(CaCertInsertionError("ERROR: CA certificate failed to be " 159 "inserted into the database"), sys.exc_info()[2])
160 161
162 -def store_CaCert(description, caCert, verbosity=0):
163 org_ids = get_all_orgs() 164 org_ids.append({'id': None}) 165 f = open(caCert, 'rb') 166 try: 167 cert = f.read().strip() 168 finally: 169 if f is not None: 170 f.close() 171 for org_id in org_ids: 172 org_id = org_id['id'] 173 store_rhnCryptoKey(description, cert, org_id, verbosity)
174
175 -def store_rhnCryptoKey(description, cert, org_id, verbosity=0):
176 """ stores cert in rhnCryptoKey 177 uses: 178 _checkCertMatch_rhnCryptoKey 179 _delete_rhnCryptoKey - not currently used 180 _insertPrep_rhnCryptoKey 181 _lobUpdate_rhnCryptoKey 182 """ 183 try: 184 # look for a cert match in the database 185 rhn_cryptokey_id = _checkCertMatch_rhnCryptoKey(cert, description, 186 org_id, deleteRowYN=1, 187 verbosity=verbosity) 188 if rhn_cryptokey_id is None: 189 # nothing to do - cert matches 190 return 191 # insert into the database 192 if rhn_cryptokey_id == -1: 193 rhn_cryptokey_id = _insertPrep_rhnCryptoKey(rhn_cryptokey_id, 194 description, org_id) 195 # write/update 196 _lobUpdate_rhnCryptoKey(rhn_cryptokey_id, cert) 197 rhnSQL.commit() 198 except rhnSQL.sql_base.SQLError: 199 raise_with_tb(CaCertInsertionError( 200 "...the traceback: %s" % fetchTraceback()), sys.exc_info()[2])
201 202
203 -def delete_rhnCryptoKey_null_org(description_prefix):
204 h = rhnSQL.prepare(_queryDeleteCryptoCertInfoNullOrg) 205 h.execute(description_prefix=description_prefix)
206 207 208 _queryDeleteCryptoCertInfoNullOrg = rhnSQL.Statement(""" 209 DELETE FROM rhnCryptoKey ck 210 WHERE ck.description LIKE :description_prefix || '%%' 211 AND ck.crypto_key_type_id = (SELECT id FROM rhnCryptoKeyType WHERE label = 'SSL') 212 AND ck.org_id is NULL 213 """) 214 215 _querySelectCryptoCertInfo = rhnSQL.Statement(""" 216 SELECT ck.id, ck.description, ckt.label as type_label, ck.key 217 FROM rhnCryptoKeyType ckt, 218 rhnCryptoKey ck 219 WHERE ckt.label = 'SSL' 220 AND ckt.id = ck.crypto_key_type_id 221 AND ck.description = :description 222 AND ck.org_id = :org_id 223 """) 224 225 _querySelectCryptoCertInfoNullOrg = rhnSQL.Statement(""" 226 SELECT ck.id, ck.description, ckt.label as type_label, ck.key 227 FROM rhnCryptoKeyType ckt, 228 rhnCryptoKey ck 229 WHERE ckt.label = 'SSL' 230 AND ckt.id = ck.crypto_key_type_id 231 AND ck.description = :description 232 AND ck.org_id is NULL 233 """) 234 235 _queryInsertCryptoCertInfo = rhnSQL.Statement(""" 236 INSERT into rhnCryptoKey 237 (id, org_id, description, crypto_key_type_id, key) 238 SELECT :rhn_cryptokey_id, :org_id, :description, ckt.id, empty_blob() 239 FROM rhnCryptoKeyType ckt 240 WHERE ckt.label = 'SSL' 241 """) 242
243 -def _test_store_rhnCryptoKey(caCert):
244 description = 'RHN-ORG-TRUSTED-SSL-CERT' 245 store_CaCert(description, caCert)
246 247 if __name__ == '__main__': 248 rhnSQL.initDB() 249 250 _test_store_rhnCryptoKey('ca.crt') 251 252 # NOTE!!! This has be seg-faulting on exit, specifically upon closeDB() 253 # Bugzilla: 127324 254 255 print("end of __main__") 256 rhnSQL.closeDB() 257 print("we have closed the database") 258