1/***********************************************************************
2*                                                                      *
3*               This software is part of the ast package               *
4*          Copyright (c) 1990-2011 AT&T Intellectual Property          *
5*                      and is licensed under the                       *
6*                 Eclipse Public License, Version 1.0                  *
7*                    by AT&T Intellectual Property                     *
8*                                                                      *
9*                A copy of the License is available at                 *
10*          http://www.eclipse.org/org/documents/epl-v10.html           *
11*         (with md5 checksum b35adb5213ca9657e911e9befb180842)         *
12*                                                                      *
13*              Information and Software Systems Research               *
14*                            AT&T Research                             *
15*                           Florham Park NJ                            *
16*                                                                      *
17*                 Glenn Fowler <gsf@research.att.com>                  *
18*                                                                      *
19***********************************************************************/
20#pragma prototyped
21/*
22 * Glenn Fowler
23 * AT&T Research
24 *
25 * send an action to the coshell for execution
26 */
27
28#include "colib.h"
29
30#include <proc.h>
31#include <ls.h>
32
33static Cojob_t*
34service(register Coshell_t* co, Coservice_t* cs, Cojob_t* cj, int flags, Sfio_t* sp)
35{
36	Proc_t*		proc;
37	size_t		n;
38	int		i;
39	int		j;
40	int		fds[2];
41	long		ops[4];
42	char*		s;
43	char**		a;
44
45	if (flags & CO_DEBUG)
46	{
47		for (a = cs->argv; *a; a++)
48			sfprintf(sp, " %s", *a);
49		if (!(s = costash(sp)))
50			goto nospace;
51		errormsg(state.lib, ERROR_LIBRARY|2, "service %s:%s", cs->path, s);
52	}
53	if (pipe(fds) < 0)
54	{
55		errormsg(state.lib, ERROR_LIBRARY|ERROR_SYSTEM|2, "%s: cannot allocate service pipe", cs->name);
56		return 0;
57	}
58	if (co->flags & CO_SHELL)
59		for (i = 0; i < elementsof(fds); i++)
60			if (fds[i] < 10 && (j = fcntl(fds[i], F_DUPFD, 10)) >= 0)
61			{
62				close(fds[i]);
63				fds[i] = j;
64			}
65	cs->fd = fds[1];
66	ops[0] = PROC_FD_DUP(fds[0], 0, PROC_FD_PARENT);
67	ops[1] = PROC_FD_CLOSE(fds[1], PROC_FD_CHILD);
68	ops[2] = PROC_FD_DUP(co->gsmfd, 1, 0);
69	ops[3] = 0;
70	if (!(proc = procopen(cs->path, cs->argv, NiL, ops, PROC_DAEMON|PROC_IGNORE)))
71	{
72		errormsg(state.lib, ERROR_LIBRARY|ERROR_SYSTEM|2, "%s: cannot connect to %s service", cs->path, cs->name);
73		close(fds[0]);
74		close(fds[1]);
75		return 0;
76	}
77	fcntl(cs->fd, F_SETFD, FD_CLOEXEC);
78	cs->pid = proc->pid;
79	procfree(proc);
80	sfprintf(sp, "id=%d info\n", cj->id);
81	n = sfstrtell(sp);
82	if (!(s = costash(sp)))
83		goto bad;
84	if (write(cs->fd, s, n) != n || sfpoll(&co->msgfp, 1, 5 * 1000) <= 0)
85		goto bad;
86	cj->pid = 0;
87	cj->status = 0;
88	cj->local = 0;
89	cj->service = cs;
90	co->svc_outstanding++;
91	co->svc_running++;
92	if (!cowait(co, cj, -1))
93		goto bad;
94	return cj;
95 bad:
96	errormsg(state.lib, ERROR_LIBRARY|ERROR_SYSTEM|2, "%s: service not responding", cs->name);
97 nospace:
98	cj->pid = CO_PID_FREE;
99	cs->pid = 0;
100	close(cs->fd);
101	cs->fd = -1;
102	return 0;
103}
104
105static Cojob_t*
106request(register Coshell_t* co, Cojob_t* cj, Coservice_t* cs, const char* action, int flags)
107{
108	ssize_t		n;
109	ssize_t		i;
110	Sfio_t*		sp;
111
112	if (!(sp = sfstropen()))
113	{
114		errormsg(state.lib, ERROR_LIBRARY|2, "out of space");
115		return 0;
116	}
117	if (!cs->fd && !service(co, cs, cj, flags, sp))
118		goto bad;
119	if (!cs->pid)
120		goto bad;
121	if (flags & CO_DEBUG)
122		errormsg(state.lib, ERROR_LIBRARY|2, "job %d commands:\n\n%s %s\n", cj->id, cs->name, action);
123	if (!(flags & CO_SILENT))
124		sfprintf(sfstderr, "+ %s %s\n", cs->name, action);
125	sfprintf(sp, "id=%d %s\n", cj->id, action);
126	n = sfstrtell(sp);
127	action = sfstrbase(sp);
128	while ((i = write(cs->fd, action, n)) > 0 && (n -= i) > 0)
129		action += i;
130	sfstrclose(sp);
131	if (n)
132		goto bad;
133	sfclose(sp);
134	cj->pid = 0;
135	cj->status = 0;
136	cj->local = 0;
137	cj->service = cs;
138	co->svc_outstanding++;
139	co->svc_running++;
140	co->total++;
141	return cj;
142 bad:
143	cj->pid = CO_PID_FREE;
144	sfclose(sp);
145	return 0;
146}
147
148Cojob_t*
149coexec(register Coshell_t* co, const char* action, int flags, const char* out, const char* err, const char* att)
150{
151	register Cojob_t*	cj;
152	register Sfio_t*	sp;
153	register Coservice_t*	cs;
154	int			n;
155	int			i;
156	int			og;
157	int			cg;
158	char*			s;
159	char*			t;
160	char*			env;
161	char*			red;
162	char*			sh[4];
163	struct stat		sto;
164	struct stat		ste;
165
166	/*
167	 * get a free job slot
168	 */
169
170	for (cj = co->jobs; cj; cj = cj->next)
171		if (cj->pid == CO_PID_FREE)
172			break;
173	if (cj)
174		cj->service = 0;
175	else if (!(cj = vmnewof(co->vm, 0, Cojob_t, 1, 0)))
176		return 0;
177	else
178	{
179		cj->coshell = co;
180		cj->pid = CO_PID_FREE;
181		cj->id = ++co->slots;
182		cj->next = co->jobs;
183		co->jobs = cj;
184	}
185
186	/*
187	 * set the flags
188	 */
189
190	flags &= ~co->mask;
191	flags |= co->flags;
192	cj->flags = flags;
193
194	/*
195	 * check service intercepts
196	 */
197
198	for (cs = co->service; cs; cs = cs->next)
199	{
200		for (s = cs->name, t = (char*)action; *s && *s == *t; s++, t++);
201		if (!*s && *t == ' ')
202			return request(co, cj, cs, t + 1, flags);
203	}
204	cj->flags &= ~CO_SERVICE;
205	red = (cj->flags & CO_APPEND) ? ">>" : ">";
206
207	/*
208	 * package the action
209	 */
210
211	if (!(env = coinitialize(co, co->flags)))
212		return 0;
213	if (!(sp = sfstropen()))
214		return 0;
215	n = strlen(action);
216	if (co->flags & CO_SERVER)
217	{
218		/*
219		 * leave it to server
220		 */
221
222		sfprintf(sp, "#%05d\ne %d %d %s %s %s",
223			0,
224			cj->id,
225			cj->flags,
226			state.pwd,
227			out,
228			err);
229		if (att)
230			sfprintf(sp, " (%d:%s)", strlen(att), att);
231		else
232			sfprintf(sp, " %s", att);
233		sfprintf(sp, " (%d:%s) (%d:%s)\n", strlen(env), env, n, action);
234	}
235	else if (co->flags & CO_INIT)
236	{
237		if (flags & CO_DEBUG)
238			sfprintf(sp, "set -x\n");
239		sfprintf(sp, "%s%s\necho x %d $? >&$%s\n",
240			env,
241			action,
242			cj->id,
243			CO_ENV_MSGFD);
244	}
245	else if (flags & CO_KSH)
246	{
247#if !_lib_fork && defined(_map_spawnve)
248		Sfio_t*	tp;
249
250		tp = sp;
251		if (!(sp = sfstropen()))
252			sp = tp;
253#endif
254		sfprintf(sp, "{\ntrap 'set %s$?; trap \"\" 0; IFS=\"\n\"; print -u$%s x %d $1 $(times); exit $1' 0 HUP INT QUIT TERM%s\n%s%s%s",
255			(flags & CO_SILENT) ? "" : "+x ",
256			CO_ENV_MSGFD,
257			cj->id,
258			(flags & CO_IGNORE) ? "" : " ERR",
259			env,
260			n > CO_MAXEVAL ? "" : "eval '",
261			(flags & CO_SILENT) ? "" : "set -x\n");
262		if (n > CO_MAXEVAL)
263			sfputr(sp, action, -1);
264		else
265		{
266			coquote(sp, action, 0);
267			sfprintf(sp, "\n'");
268		}
269		sfprintf(sp, "\n} </dev/null");
270		if (out)
271		{
272			if (*out == '/')
273				sfprintf(sp, " %s%s", red, out);
274			else
275				sfprintf(sp, " %s%s/%s", red, state.pwd, out);
276		}
277		else if ((flags & CO_SERIALIZE) && (cj->out = pathtemp(NiL, 64, NiL, "coo", NiL)))
278			sfprintf(sp, " >%s", cj->out);
279		if (err)
280		{
281			if (out && streq(out, err))
282				sfprintf(sp, " 2>&1");
283			else if (*err == '/')
284				sfprintf(sp, " 2%s%s", red, err);
285			else
286				sfprintf(sp, " 2%s%s/%s", red, state.pwd, err);
287		}
288		else if (flags & CO_SERIALIZE)
289		{
290			if (!out && !fstat(1, &sto) && !fstat(2, &ste) && sto.st_dev == ste.st_dev && sto.st_ino == ste.st_ino)
291				sfprintf(sp, " 2>&1");
292			else if (cj->err = pathtemp(NiL, 64, NiL, "coe", NiL))
293				sfprintf(sp, " 2>%s", cj->err);
294		}
295#if !_lib_fork && defined(_map_spawnve)
296		if (sp != tp)
297		{
298			sfprintf(tp, "%s -c '", state.sh);
299			if (!(s = costash(sp)))
300				return 0;
301			coquote(tp, s, 0);
302			sfprintf(tp, "'");
303			sfstrclose(sp);
304			sp = tp;
305		}
306#endif
307		sfprintf(sp, " &\nprint -u$%s j %d $!\n",
308			CO_ENV_MSGFD,
309			cj->id);
310	}
311	else
312	{
313#if !_lib_fork && defined(_map_spawnve)
314		Sfio_t*	tp;
315
316		tp = sp;
317		if (!(sp = sfstropen())) sp = tp;
318#endif
319		flags |= CO_IGNORE;
320		if (co->mode & CO_MODE_SEPARATE)
321		{
322			flags &= ~CO_SERIALIZE;
323			og = '{';
324			cg = '}';
325		}
326		else
327		{
328			og = '(';
329			cg = ')';
330		}
331		sfprintf(sp, "%c\n%s%sset -%s%s\n",
332			og,
333			env,
334			n > CO_MAXEVAL ? "" : "eval '",
335			(flags & CO_IGNORE) ? "" : "e",
336			(flags & CO_SILENT) ? "" : "x");
337		if (n > CO_MAXEVAL)
338			sfprintf(sp, "%s", action);
339		else
340		{
341			coquote(sp, action, 0);
342			sfprintf(sp, "\n'");
343		}
344		sfprintf(sp, "\n%c </dev/null", cg);
345		if (out)
346		{
347			if (*out == '/')
348				sfprintf(sp, " %s%s", red, out);
349			else
350				sfprintf(sp, " %s%s/%s", red, state.pwd, out);
351		}
352		else if ((flags & CO_SERIALIZE) && (cj->out = pathtemp(NiL, 64, NiL, "coo", NiL)))
353			sfprintf(sp, " >%s", cj->out);
354		if (err)
355		{
356			if (out && streq(out, err))
357				sfprintf(sp, " 2>&1");
358			else if (*err == '/')
359				sfprintf(sp, " 2%s%s", red, err);
360			else
361				sfprintf(sp, " 2%s%s/%s", red, state.pwd, err);
362		}
363		else if (flags & CO_SERIALIZE)
364		{
365			if (out)
366				sfprintf(sp, " 2>&1");
367			else if (cj->err = pathtemp(NiL, 64, NiL, "coe", NiL))
368				sfprintf(sp, " 2>%s", cj->err);
369		}
370		if (!(co->mode & CO_MODE_SEPARATE))
371		{
372			if (flags & CO_OSH)
373				sfprintf(sp, " && echo x %d 0 >&$%s || echo x %d $? >&$%s",
374					cj->id,
375					CO_ENV_MSGFD,
376					cj->id,
377					CO_ENV_MSGFD);
378			else
379				sfprintf(sp, " && echo x %d 0 `times` >&$%s || echo x %d $? `times` >&$%s",
380					cj->id,
381					CO_ENV_MSGFD,
382					cj->id,
383					CO_ENV_MSGFD);
384		}
385#if !_lib_fork && defined(_map_spawnve)
386		if (sp != tp)
387		{
388			sfprintf(tp, "%s -c '", state.sh);
389			if (!(s = costash(sp)))
390				return 0;
391			coquote(tp, s, 0);
392			sfprintf(tp, "'");
393			sfstrclose(sp);
394			sp = tp;
395		}
396#endif
397		if (!(co->mode & CO_MODE_SEPARATE))
398			sfprintf(sp, " &\necho j %d $! >&$%s\n",
399				cj->id,
400				CO_ENV_MSGFD);
401	}
402	n = sfstrtell(sp);
403	if (!costash(sp))
404		return 0;
405	if (flags & CO_SERVER)
406		sfprintf(sp, "#%05d\n", n - 7);
407	s = sfstrseek(sp, 0, SEEK_SET);
408	if (flags & CO_DEBUG)
409		errormsg(state.lib, ERROR_LIBRARY|2, "job %d commands:\n\n%s\n", cj->id, s);
410	if (co->mode & CO_MODE_SEPARATE)
411	{
412		sh[0] = state.sh;
413		sh[1] = "-c";
414		sh[2] = s;
415		sh[3] = 0;
416		cj->status = procrun(state.sh, sh, 0);
417		sfstrclose(sp);
418		cj->pid = CO_PID_ZOMBIE;
419		cj->local = 0;
420		co->outstanding++;
421		co->total++;
422	}
423	else
424	{
425		/*
426		 * send it off
427		 */
428
429		while ((i = write(co->cmdfd, s, n)) > 0 && (n -= i) > 0)
430			s += i;
431		sfstrclose(sp);
432		if (n)
433			return 0;
434
435		/*
436		 * it's a job
437		 */
438
439		cj->pid = 0;
440		cj->status = 0;
441		cj->local = 0;
442		co->outstanding++;
443		co->running++;
444		co->total++;
445		if (co->mode & CO_MODE_ACK)
446			cj = cowait(co, cj, -1);
447	}
448	return cj;
449}
450