• Home
  • History
  • Annotate
  • Line#
  • Navigate
  • Raw
  • Download
  • only in /asuswrt-rt-n18u-9.0.0.4.380.2695/release/src-rt-6.x.4708/router/samba-3.5.8/source4/scripting/python/samba/torture/
1import sys, string
2import dcerpc
3
4
5def ResizeBufferCall(fn, pipe, r):
6
7    r['buffer'] = None
8    r['buf_size'] = 0
9
10    result = fn(pipe, r)
11
12    if result['result'] == dcerpc.WERR_INSUFFICIENT_BUFFER or \
13       result['result'] == dcerpc.WERR_MORE_DATA:
14        r['buffer'] = result['buf_size'] * '\x00'
15        r['buf_size'] = result['buf_size']
16
17    result = fn(pipe, r)
18
19    return result
20
21
22def test_OpenPrinterEx(pipe, printer):
23
24    print 'spoolss_OpenPrinterEx(%s)' % printer
25
26    printername = '\\\\%s' % dcerpc.dcerpc_server_name(pipe)
27
28    if printer is not None:
29        printername = printername + '\\%s' % printer
30
31    r = {}
32    r['printername'] = printername
33    r['datatype'] = None
34    r['devmode_ctr'] = {}
35    r['devmode_ctr']['size'] = 0
36    r['devmode_ctr']['devmode'] = None
37    r['access_mask'] = 0x02000000
38    r['level'] = 1
39    r['userlevel'] = {}
40    r['userlevel']['level1'] = {}
41    r['userlevel']['level1']['size'] = 0
42    r['userlevel']['level1']['client'] = None
43    r['userlevel']['level1']['user'] = None
44    r['userlevel']['level1']['build'] = 1381
45    r['userlevel']['level1']['major'] = 2
46    r['userlevel']['level1']['minor'] = 0
47    r['userlevel']['level1']['processor'] = 0
48
49    result = dcerpc.spoolss_OpenPrinterEx(pipe, r)
50
51    return result['handle']
52
53
54def test_ClosePrinter(pipe, handle):
55
56    r = {}
57    r['handle'] = handle
58
59    dcerpc.spoolss_ClosePrinter(pipe, r)
60
61
62def test_GetPrinter(pipe, handle):
63
64    r = {}
65    r['handle'] = handle
66
67    for level in [0, 1, 2, 3, 4, 5, 6, 7]:
68
69        print 'spoolss_GetPrinter(level = %d)' % level
70
71        r['level'] = level
72        r['buffer'] = None
73        r['buf_size'] = 0
74
75        result = ResizeBufferCall(dcerpc.spoolss_GetPrinter, pipe, r)
76
77
78def test_EnumForms(pipe, handle):
79
80    print 'spoolss_EnumForms()'
81
82    r = {}
83    r['handle'] = handle
84    r['level'] = 1
85    r['buffer'] = None
86    r['buf_size'] = 0
87
88    result = ResizeBufferCall(dcerpc.spoolss_EnumForms, pipe, r)
89
90    forms = dcerpc.unmarshall_spoolss_FormInfo_array(
91        result['buffer'], r['level'], result['count'])
92
93    for form in forms:
94
95        r = {}
96        r['handle'] = handle
97        r['formname'] = form['info1']['formname']
98        r['level'] = 1
99
100        result = ResizeBufferCall(dcerpc.spoolss_GetForm, pipe, r)
101
102
103def test_EnumPorts(pipe, handle):
104
105    print 'spoolss_EnumPorts()'
106
107    for level in [1, 2]:
108
109        r = {}
110        r['handle'] = handle
111        r['servername'] = None
112        r['level'] = level
113
114        result = ResizeBufferCall(dcerpc.spoolss_EnumPorts, pipe, r)
115
116        ports = dcerpc.unmarshall_spoolss_PortInfo_array(
117            result['buffer'], r['level'], result['count'])
118
119        if level == 1:
120            port_names = map(lambda x: x['info1']['port_name'], ports)
121
122
123def test_DeleteForm(pipe, handle, formname):
124
125    r = {}
126    r['handle'] = handle
127    r['formname'] = formname
128
129    dcerpc.spoolss_DeleteForm(pipe, r)
130
131
132def test_GetForm(pipe, handle, formname):
133
134    r = {}
135    r['handle'] = handle
136    r['formname'] = formname
137    r['level'] = 1
138
139    result = ResizeBufferCall(dcerpc.spoolss_GetForm, pipe, r)
140
141    return result['info']['info1']
142
143
144def test_SetForm(pipe, handle, form):
145
146    print 'spoolss_SetForm()'
147
148    r = {}
149    r['handle'] = handle
150    r['level'] = 1
151    r['formname'] = form['info1']['formname']
152    r['info'] = form
153
154    dcerpc.spoolss_SetForm(pipe, r)
155
156    newform = test_GetForm(pipe, handle, r['formname'])
157
158    if form['info1'] != newform:
159        print 'SetForm: mismatch: %s != %s' % \
160              (r['info']['info1'], f)
161        sys.exit(1)
162
163
164def test_AddForm(pipe, handle):
165
166    print 'spoolss_AddForm()'
167
168    formname = '__testform__'
169
170    r = {}
171    r['handle'] = handle
172    r['level'] = 1
173    r['info'] = {}
174    r['info']['info1'] = {}
175    r['info']['info1']['formname'] = formname
176    r['info']['info1']['flags'] = 0x0002
177    r['info']['info1']['width'] = 100
178    r['info']['info1']['length'] = 100
179    r['info']['info1']['left'] = 0
180    r['info']['info1']['top'] = 1000
181    r['info']['info1']['right'] = 2000
182    r['info']['info1']['bottom'] = 3000
183
184    try:
185        result = dcerpc.spoolss_AddForm(pipe, r)
186    except dcerpc.WERROR, arg:
187        if arg[0] == dcerpc.WERR_ALREADY_EXISTS:
188            test_DeleteForm(pipe, handle, formname)
189        result = dcerpc.spoolss_AddForm(pipe, r)
190
191    f = test_GetForm(pipe, handle, formname)
192
193    if r['info']['info1'] != f:
194        print 'AddForm: mismatch: %s != %s' % \
195              (r['info']['info1'], f)
196        sys.exit(1)
197
198    r['formname'] = formname
199
200    test_SetForm(pipe, handle, r['info'])
201
202    test_DeleteForm(pipe, handle, formname)
203
204
205def test_EnumJobs(pipe, handle):
206
207    print 'spoolss_EnumJobs()'
208
209    r = {}
210    r['handle'] = handle
211    r['firstjob'] = 0
212    r['numjobs'] = 0xffffffff
213    r['level'] = 1
214
215    result = ResizeBufferCall(dcerpc.spoolss_EnumJobs, pipe, r)
216
217    if result['buffer'] is None:
218        return
219
220    jobs = dcerpc.unmarshall_spoolss_JobInfo_array(
221        result['buffer'], r['level'], result['count'])
222
223    for job in jobs:
224
225        s = {}
226        s['handle'] = handle
227        s['job_id'] = job['info1']['job_id']
228        s['level'] = 1
229
230        result = ResizeBufferCall(dcerpc.spoolss_GetJob, pipe, s)
231
232        if result['info'] != job:
233            print 'EnumJobs: mismatch: %s != %s' % (result['info'], job)
234            sys.exit(1)
235
236
237    # TODO: AddJob, DeleteJob, ScheduleJob
238
239
240def test_EnumPrinterData(pipe, handle):
241
242    print 'test_EnumPrinterData()'
243
244    enum_index = 0
245
246    while 1:
247
248        r = {}
249        r['handle'] = handle
250        r['enum_index'] = enum_index
251
252        r['value_offered'] = 0
253        r['data_size'] = 0
254
255        result = dcerpc.spoolss_EnumPrinterData(pipe, r)
256
257        r['value_offered'] = result['value_needed']
258        r['data_size'] = result['data_size']
259
260        result = dcerpc.spoolss_EnumPrinterData(pipe, r)
261
262        if result['result'] == dcerpc.WERR_NO_MORE_ITEMS:
263            break
264
265        s = {}
266        s['handle'] = handle
267        s['value_name'] = result['value_name']
268
269        result2 = ResizeBufferCall(dcerpc.spoolss_GetPrinterData, pipe, s)
270
271        if result['buffer'][:result2['buf_size']] != result2['buffer']:
272            print 'EnumPrinterData/GetPrinterData mismatch'
273            sys.exit(1)
274
275        enum_index += 1
276
277
278def test_SetPrinterDataEx(pipe, handle):
279
280    valuename = '__printerdataextest__'
281    data = '12345'
282
283    r = {}
284    r['handle'] = handle
285    r['key_name'] = 'DsSpooler'
286    r['value_name'] = valuename
287    r['type'] = 3
288    r['buffer'] = data
289    r['buf_size'] = len(data)
290
291    result = dcerpc.spoolss_SetPrinterDataEx(pipe, r)
292
293
294def test_EnumPrinterDataEx(pipe, handle):
295
296    r = {}
297    r['handle'] = handle
298    r['key_name'] = 'DsSpooler'
299    r['buf_size'] = 0
300
301    result = dcerpc.spoolss_EnumPrinterDataEx(pipe, r)
302
303    if result['result'] == dcerpc.WERR_MORE_DATA:
304        r['buf_size'] = result['buf_size']
305
306        result = dcerpc.spoolss_EnumPrinterDataEx(pipe, r)
307
308    # TODO: test spoolss_GetPrinterDataEx()
309
310
311def test_SetPrinterData(pipe, handle):
312
313    print 'testing spoolss_SetPrinterData()'
314
315    valuename = '__printerdatatest__'
316    data = '12345'
317
318    r = {}
319    r['handle'] = handle
320    r['value_name'] = valuename
321    r['type'] = 3                       # REG_BINARY
322    r['buffer'] = data
323    r['real_len'] = 5
324
325    dcerpc.spoolss_SetPrinterData(pipe, r)
326
327    s = {}
328    s['handle'] = handle
329    s['value_name'] = valuename
330
331    result = ResizeBufferCall(dcerpc.spoolss_GetPrinterData, pipe, r)
332
333    if result['buffer'] != data:
334        print 'SetPrinterData: mismatch'
335        sys.exit(1)
336
337    dcerpc.spoolss_DeletePrinterData(pipe, r)
338
339
340def test_EnumPrinters(pipe):
341
342    print 'testing spoolss_EnumPrinters()'
343
344    printer_names = None
345
346    r = {}
347    r['flags'] = 0x02
348    r['server'] = None
349
350    for level in [0, 1, 2, 4, 5]:
351
352        print 'test_EnumPrinters(level = %d)' % level
353
354        r['level'] = level
355
356        result = ResizeBufferCall(dcerpc.spoolss_EnumPrinters, pipe, r)
357
358        printers = dcerpc.unmarshall_spoolss_PrinterInfo_array(
359            result['buffer'], r['level'], result['count'])
360
361        if level == 2:
362            for p in printers:
363
364                # A nice check is for the specversion in the
365                # devicemode.  This has always been observed to be
366                # 1025.
367
368                if p['info2']['devmode']['specversion'] != 1025:
369                    print 'test_EnumPrinters: specversion != 1025'
370                    sys.exit(1)
371
372    r['level'] = 1
373    result = ResizeBufferCall(dcerpc.spoolss_EnumPrinters, pipe, r)
374
375    for printer in dcerpc.unmarshall_spoolss_PrinterInfo_array(
376        result['buffer'], r['level'], result['count']):
377
378        if string.find(printer['info1']['name'], '\\\\') == 0:
379            print 'Skipping remote printer %s' % printer['info1']['name']
380            continue
381
382        printername = string.split(printer['info1']['name'], ',')[0]
383
384        handle = test_OpenPrinterEx(pipe, printername)
385
386        test_GetPrinter(pipe, handle)
387        test_EnumPorts(pipe, handle)
388        test_EnumForms(pipe, handle)
389        test_AddForm(pipe, handle)
390        test_EnumJobs(pipe, handle)
391        test_EnumPrinterData(pipe, handle)
392        test_EnumPrinterDataEx(pipe, handle)
393        test_SetPrinterData(pipe, handle)
394#       test_SetPrinterDataEx(pipe, handle)
395        test_ClosePrinter(pipe, handle)
396
397
398def test_EnumPrinterDrivers(pipe):
399
400    print 'test spoolss_EnumPrinterDrivers()'
401
402    for level in [1, 2, 3]:
403
404        r = {}
405        r['server'] = None
406        r['environment'] = None
407        r['level'] = level
408
409        result = ResizeBufferCall(dcerpc.spoolss_EnumPrinterDrivers, pipe, r)
410
411        drivers = dcerpc.unmarshall_spoolss_DriverInfo_array(
412            result['buffer'], r['level'], result['count'])
413
414        if level == 1:
415            driver_names = map(lambda x: x['info1']['driver_name'], drivers)
416
417
418def test_PrintServer(pipe):
419
420    handle = test_OpenPrinterEx(pipe, None)
421
422    # EnumForms and AddForm tests return WERR_BADFID here (??)
423
424    test_ClosePrinter(pipe, handle)
425
426
427def runtests(binding, domain, username, password):
428
429    print 'Testing SPOOLSS pipe'
430
431    pipe = dcerpc.pipe_connect(binding,
432            dcerpc.DCERPC_SPOOLSS_UUID, dcerpc.DCERPC_SPOOLSS_VERSION,
433            domain, username, password)
434
435    test_EnumPrinters(pipe)
436    test_EnumPrinterDrivers(pipe)
437    test_PrintServer(pipe)
438