1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 import sys
22 from datetime import datetime
23
24 from M2Crypto import X509
25
26
27 from spacewalk.common.rhnLib import utc
28 from spacewalk.common.usix import raise_with_tb
29 from spacewalk.server import rhnSQL
30 from spacewalk.common.rhnTB import fetchTraceback
31
32
33
34
35
36
38 cert = X509.load_cert_string(cert_str)
39 not_before = cert.get_not_before().get_datetime()
40 not_after = cert.get_not_after().get_datetime()
41 subject = cert.get_subject()
42 cn = subject.CN
43 serial_number = cert.get_serial_number()
44 return cn, serial_number, not_before, not_after
45
46
48 _, _, not_before, not_after = get_certificate_info(cert_str)
49 now = datetime.now(utc)
50 return not_before < now < not_after
51
52
54 """ Fetch org_id. Create first org_id if needed.
55 owner only needed if no org_id present
56 NOTE: this is duplicated elsewhere (backend.py)
57 but I need the error differientiation of (1) too many orgs
58 and (2) no orgs. backend.py does not differientiate.
59 """
60
61
62 h = rhnSQL.prepare(_queryLookupOrgId)
63 h.execute()
64 rows = h.fetchall_dict()
65 return rows or []
66
67
68 _queryLookupOrgId = rhnSQL.Statement("""
69 SELECT id
70 FROM web_customer
71 """)
72
73
74
75
76
77
80
81
92
95 """ is there an CA SSL certificate already in the database?
96 If yes:
97 return ID:
98 -1, then no cert in DB
99 None if they are identical (i.e., nothing to do)
100 0...N if cert is in database
101
102 if found, optionally deletes the row and returns -1
103 Used ONLY by: store_rhnCryptoKey(...)
104 """
105
106 row = lookup_cert(description, org_id)
107 rhn_cryptokey_id = -1
108 if row:
109 if cert == rhnSQL.read_lob(row['key']):
110
111 if verbosity:
112 print("Nothing to do: certificate to be pushed matches certificate in database.")
113 return None
114
115 rhn_cryptokey_id = int(row['id'])
116
117
118 if deleteRowYN:
119
120 h = rhnSQL.prepare('delete from rhnCryptoKey where id=:rhn_cryptokey_id')
121 h.execute(rhn_cryptokey_id=rhn_cryptokey_id)
122
123 rhn_cryptokey_id = -1
124 return rhn_cryptokey_id
125
126
128 """ inserts a row given that a cert is not already in the database
129 lob rewrite occurs later during update.
130 Used ONLY by: store_rhnCryptoKey(...)
131 """
132
133
134
135
136
137 rhn_cryptokey_id_seq = rhnSQL.Sequence('rhn_cryptokey_id_seq')
138 rhn_cryptokey_id = rhn_cryptokey_id_seq.next()
139
140 h = rhnSQL.prepare(_queryInsertCryptoCertInfo)
141
142 h.execute(rhn_cryptokey_id=rhn_cryptokey_id,
143 description=description, org_id=org_id)
144 return rhn_cryptokey_id
145
146
148 """ writes/updates the cert as a lob """
149
150
151
152 h = rhnSQL.cursor()
153 try:
154 h.update_blob("rhnCryptoKey", "key", "WHERE id = :rhn_cryptokey_id",
155 cert, rhn_cryptokey_id=rhn_cryptokey_id)
156 except:
157
158 raise_with_tb(CaCertInsertionError("ERROR: CA certificate failed to be "
159 "inserted into the database"), sys.exc_info()[2])
160
161
174
201
202
206
207
208 _queryDeleteCryptoCertInfoNullOrg = rhnSQL.Statement("""
209 DELETE FROM rhnCryptoKey ck
210 WHERE ck.description LIKE :description_prefix || '%%'
211 AND ck.crypto_key_type_id = (SELECT id FROM rhnCryptoKeyType WHERE label = 'SSL')
212 AND ck.org_id is NULL
213 """)
214
215 _querySelectCryptoCertInfo = rhnSQL.Statement("""
216 SELECT ck.id, ck.description, ckt.label as type_label, ck.key
217 FROM rhnCryptoKeyType ckt,
218 rhnCryptoKey ck
219 WHERE ckt.label = 'SSL'
220 AND ckt.id = ck.crypto_key_type_id
221 AND ck.description = :description
222 AND ck.org_id = :org_id
223 """)
224
225 _querySelectCryptoCertInfoNullOrg = rhnSQL.Statement("""
226 SELECT ck.id, ck.description, ckt.label as type_label, ck.key
227 FROM rhnCryptoKeyType ckt,
228 rhnCryptoKey ck
229 WHERE ckt.label = 'SSL'
230 AND ckt.id = ck.crypto_key_type_id
231 AND ck.description = :description
232 AND ck.org_id is NULL
233 """)
234
235 _queryInsertCryptoCertInfo = rhnSQL.Statement("""
236 INSERT into rhnCryptoKey
237 (id, org_id, description, crypto_key_type_id, key)
238 SELECT :rhn_cryptokey_id, :org_id, :description, ckt.id, empty_blob()
239 FROM rhnCryptoKeyType ckt
240 WHERE ckt.label = 'SSL'
241 """)
242
246
247 if __name__ == '__main__':
248 rhnSQL.initDB()
249
250 _test_store_rhnCryptoKey('ca.crt')
251
252
253
254
255 print("end of __main__")
256 rhnSQL.closeDB()
257 print("we have closed the database")
258