1import sys, string 2import dcerpc 3 4 5def ResizeBufferCall(fn, pipe, r): 6 7 r['buffer'] = None 8 r['buf_size'] = 0 9 10 result = fn(pipe, r) 11 12 if result['result'] == dcerpc.WERR_INSUFFICIENT_BUFFER or \ 13 result['result'] == dcerpc.WERR_MORE_DATA: 14 r['buffer'] = result['buf_size'] * '\x00' 15 r['buf_size'] = result['buf_size'] 16 17 result = fn(pipe, r) 18 19 return result 20 21 22def test_OpenPrinterEx(pipe, printer): 23 24 print 'spoolss_OpenPrinterEx(%s)' % printer 25 26 printername = '\\\\%s' % dcerpc.dcerpc_server_name(pipe) 27 28 if printer is not None: 29 printername = printername + '\\%s' % printer 30 31 r = {} 32 r['printername'] = printername 33 r['datatype'] = None 34 r['devmode_ctr'] = {} 35 r['devmode_ctr']['size'] = 0 36 r['devmode_ctr']['devmode'] = None 37 r['access_mask'] = 0x02000000 38 r['level'] = 1 39 r['userlevel'] = {} 40 r['userlevel']['level1'] = {} 41 r['userlevel']['level1']['size'] = 0 42 r['userlevel']['level1']['client'] = None 43 r['userlevel']['level1']['user'] = None 44 r['userlevel']['level1']['build'] = 1381 45 r['userlevel']['level1']['major'] = 2 46 r['userlevel']['level1']['minor'] = 0 47 r['userlevel']['level1']['processor'] = 0 48 49 result = dcerpc.spoolss_OpenPrinterEx(pipe, r) 50 51 return result['handle'] 52 53 54def test_ClosePrinter(pipe, handle): 55 56 r = {} 57 r['handle'] = handle 58 59 dcerpc.spoolss_ClosePrinter(pipe, r) 60 61 62def test_GetPrinter(pipe, handle): 63 64 r = {} 65 r['handle'] = handle 66 67 for level in [0, 1, 2, 3, 4, 5, 6, 7]: 68 69 print 'spoolss_GetPrinter(level = %d)' % level 70 71 r['level'] = level 72 r['buffer'] = None 73 r['buf_size'] = 0 74 75 result = ResizeBufferCall(dcerpc.spoolss_GetPrinter, pipe, r) 76 77 78def test_EnumForms(pipe, handle): 79 80 print 'spoolss_EnumForms()' 81 82 r = {} 83 r['handle'] = handle 84 r['level'] = 1 85 r['buffer'] = None 86 r['buf_size'] = 0 87 88 result = ResizeBufferCall(dcerpc.spoolss_EnumForms, pipe, r) 89 90 forms = dcerpc.unmarshall_spoolss_FormInfo_array( 91 result['buffer'], r['level'], result['count']) 92 93 for form in forms: 94 95 r = {} 96 r['handle'] = handle 97 r['formname'] = form['info1']['formname'] 98 r['level'] = 1 99 100 result = ResizeBufferCall(dcerpc.spoolss_GetForm, pipe, r) 101 102 103def test_EnumPorts(pipe, handle): 104 105 print 'spoolss_EnumPorts()' 106 107 for level in [1, 2]: 108 109 r = {} 110 r['handle'] = handle 111 r['servername'] = None 112 r['level'] = level 113 114 result = ResizeBufferCall(dcerpc.spoolss_EnumPorts, pipe, r) 115 116 ports = dcerpc.unmarshall_spoolss_PortInfo_array( 117 result['buffer'], r['level'], result['count']) 118 119 if level == 1: 120 port_names = map(lambda x: x['info1']['port_name'], ports) 121 122 123def test_DeleteForm(pipe, handle, formname): 124 125 r = {} 126 r['handle'] = handle 127 r['formname'] = formname 128 129 dcerpc.spoolss_DeleteForm(pipe, r) 130 131 132def test_GetForm(pipe, handle, formname): 133 134 r = {} 135 r['handle'] = handle 136 r['formname'] = formname 137 r['level'] = 1 138 139 result = ResizeBufferCall(dcerpc.spoolss_GetForm, pipe, r) 140 141 return result['info']['info1'] 142 143 144def test_SetForm(pipe, handle, form): 145 146 print 'spoolss_SetForm()' 147 148 r = {} 149 r['handle'] = handle 150 r['level'] = 1 151 r['formname'] = form['info1']['formname'] 152 r['info'] = form 153 154 dcerpc.spoolss_SetForm(pipe, r) 155 156 newform = test_GetForm(pipe, handle, r['formname']) 157 158 if form['info1'] != newform: 159 print 'SetForm: mismatch: %s != %s' % \ 160 (r['info']['info1'], f) 161 sys.exit(1) 162 163 164def test_AddForm(pipe, handle): 165 166 print 'spoolss_AddForm()' 167 168 formname = '__testform__' 169 170 r = {} 171 r['handle'] = handle 172 r['level'] = 1 173 r['info'] = {} 174 r['info']['info1'] = {} 175 r['info']['info1']['formname'] = formname 176 r['info']['info1']['flags'] = 0x0002 177 r['info']['info1']['width'] = 100 178 r['info']['info1']['length'] = 100 179 r['info']['info1']['left'] = 0 180 r['info']['info1']['top'] = 1000 181 r['info']['info1']['right'] = 2000 182 r['info']['info1']['bottom'] = 3000 183 184 try: 185 result = dcerpc.spoolss_AddForm(pipe, r) 186 except dcerpc.WERROR, arg: 187 if arg[0] == dcerpc.WERR_ALREADY_EXISTS: 188 test_DeleteForm(pipe, handle, formname) 189 result = dcerpc.spoolss_AddForm(pipe, r) 190 191 f = test_GetForm(pipe, handle, formname) 192 193 if r['info']['info1'] != f: 194 print 'AddForm: mismatch: %s != %s' % \ 195 (r['info']['info1'], f) 196 sys.exit(1) 197 198 r['formname'] = formname 199 200 test_SetForm(pipe, handle, r['info']) 201 202 test_DeleteForm(pipe, handle, formname) 203 204 205def test_EnumJobs(pipe, handle): 206 207 print 'spoolss_EnumJobs()' 208 209 r = {} 210 r['handle'] = handle 211 r['firstjob'] = 0 212 r['numjobs'] = 0xffffffff 213 r['level'] = 1 214 215 result = ResizeBufferCall(dcerpc.spoolss_EnumJobs, pipe, r) 216 217 if result['buffer'] is None: 218 return 219 220 jobs = dcerpc.unmarshall_spoolss_JobInfo_array( 221 result['buffer'], r['level'], result['count']) 222 223 for job in jobs: 224 225 s = {} 226 s['handle'] = handle 227 s['job_id'] = job['info1']['job_id'] 228 s['level'] = 1 229 230 result = ResizeBufferCall(dcerpc.spoolss_GetJob, pipe, s) 231 232 if result['info'] != job: 233 print 'EnumJobs: mismatch: %s != %s' % (result['info'], job) 234 sys.exit(1) 235 236 237 # TODO: AddJob, DeleteJob, ScheduleJob 238 239 240def test_EnumPrinterData(pipe, handle): 241 242 print 'test_EnumPrinterData()' 243 244 enum_index = 0 245 246 while 1: 247 248 r = {} 249 r['handle'] = handle 250 r['enum_index'] = enum_index 251 252 r['value_offered'] = 0 253 r['data_size'] = 0 254 255 result = dcerpc.spoolss_EnumPrinterData(pipe, r) 256 257 r['value_offered'] = result['value_needed'] 258 r['data_size'] = result['data_size'] 259 260 result = dcerpc.spoolss_EnumPrinterData(pipe, r) 261 262 if result['result'] == dcerpc.WERR_NO_MORE_ITEMS: 263 break 264 265 s = {} 266 s['handle'] = handle 267 s['value_name'] = result['value_name'] 268 269 result2 = ResizeBufferCall(dcerpc.spoolss_GetPrinterData, pipe, s) 270 271 if result['buffer'][:result2['buf_size']] != result2['buffer']: 272 print 'EnumPrinterData/GetPrinterData mismatch' 273 sys.exit(1) 274 275 enum_index += 1 276 277 278def test_SetPrinterDataEx(pipe, handle): 279 280 valuename = '__printerdataextest__' 281 data = '12345' 282 283 r = {} 284 r['handle'] = handle 285 r['key_name'] = 'DsSpooler' 286 r['value_name'] = valuename 287 r['type'] = 3 288 r['buffer'] = data 289 r['buf_size'] = len(data) 290 291 result = dcerpc.spoolss_SetPrinterDataEx(pipe, r) 292 293 294def test_EnumPrinterDataEx(pipe, handle): 295 296 r = {} 297 r['handle'] = handle 298 r['key_name'] = 'DsSpooler' 299 r['buf_size'] = 0 300 301 result = dcerpc.spoolss_EnumPrinterDataEx(pipe, r) 302 303 if result['result'] == dcerpc.WERR_MORE_DATA: 304 r['buf_size'] = result['buf_size'] 305 306 result = dcerpc.spoolss_EnumPrinterDataEx(pipe, r) 307 308 # TODO: test spoolss_GetPrinterDataEx() 309 310 311def test_SetPrinterData(pipe, handle): 312 313 print 'testing spoolss_SetPrinterData()' 314 315 valuename = '__printerdatatest__' 316 data = '12345' 317 318 r = {} 319 r['handle'] = handle 320 r['value_name'] = valuename 321 r['type'] = 3 # REG_BINARY 322 r['buffer'] = data 323 r['real_len'] = 5 324 325 dcerpc.spoolss_SetPrinterData(pipe, r) 326 327 s = {} 328 s['handle'] = handle 329 s['value_name'] = valuename 330 331 result = ResizeBufferCall(dcerpc.spoolss_GetPrinterData, pipe, r) 332 333 if result['buffer'] != data: 334 print 'SetPrinterData: mismatch' 335 sys.exit(1) 336 337 dcerpc.spoolss_DeletePrinterData(pipe, r) 338 339 340def test_EnumPrinters(pipe): 341 342 print 'testing spoolss_EnumPrinters()' 343 344 printer_names = None 345 346 r = {} 347 r['flags'] = 0x02 348 r['server'] = None 349 350 for level in [0, 1, 2, 4, 5]: 351 352 print 'test_EnumPrinters(level = %d)' % level 353 354 r['level'] = level 355 356 result = ResizeBufferCall(dcerpc.spoolss_EnumPrinters, pipe, r) 357 358 printers = dcerpc.unmarshall_spoolss_PrinterInfo_array( 359 result['buffer'], r['level'], result['count']) 360 361 if level == 2: 362 for p in printers: 363 364 # A nice check is for the specversion in the 365 # devicemode. This has always been observed to be 366 # 1025. 367 368 if p['info2']['devmode']['specversion'] != 1025: 369 print 'test_EnumPrinters: specversion != 1025' 370 sys.exit(1) 371 372 r['level'] = 1 373 result = ResizeBufferCall(dcerpc.spoolss_EnumPrinters, pipe, r) 374 375 for printer in dcerpc.unmarshall_spoolss_PrinterInfo_array( 376 result['buffer'], r['level'], result['count']): 377 378 if string.find(printer['info1']['name'], '\\\\') == 0: 379 print 'Skipping remote printer %s' % printer['info1']['name'] 380 continue 381 382 printername = string.split(printer['info1']['name'], ',')[0] 383 384 handle = test_OpenPrinterEx(pipe, printername) 385 386 test_GetPrinter(pipe, handle) 387 test_EnumPorts(pipe, handle) 388 test_EnumForms(pipe, handle) 389 test_AddForm(pipe, handle) 390 test_EnumJobs(pipe, handle) 391 test_EnumPrinterData(pipe, handle) 392 test_EnumPrinterDataEx(pipe, handle) 393 test_SetPrinterData(pipe, handle) 394# test_SetPrinterDataEx(pipe, handle) 395 test_ClosePrinter(pipe, handle) 396 397 398def test_EnumPrinterDrivers(pipe): 399 400 print 'test spoolss_EnumPrinterDrivers()' 401 402 for level in [1, 2, 3]: 403 404 r = {} 405 r['server'] = None 406 r['environment'] = None 407 r['level'] = level 408 409 result = ResizeBufferCall(dcerpc.spoolss_EnumPrinterDrivers, pipe, r) 410 411 drivers = dcerpc.unmarshall_spoolss_DriverInfo_array( 412 result['buffer'], r['level'], result['count']) 413 414 if level == 1: 415 driver_names = map(lambda x: x['info1']['driver_name'], drivers) 416 417 418def test_PrintServer(pipe): 419 420 handle = test_OpenPrinterEx(pipe, None) 421 422 # EnumForms and AddForm tests return WERR_BADFID here (??) 423 424 test_ClosePrinter(pipe, handle) 425 426 427def runtests(binding, domain, username, password): 428 429 print 'Testing SPOOLSS pipe' 430 431 pipe = dcerpc.pipe_connect(binding, 432 dcerpc.DCERPC_SPOOLSS_UUID, dcerpc.DCERPC_SPOOLSS_VERSION, 433 domain, username, password) 434 435 test_EnumPrinters(pipe) 436 test_EnumPrinterDrivers(pipe) 437 test_PrintServer(pipe) 438