vchiq_core.c revision 1.2
1/**
2 * Copyright (c) 2010-2012 Broadcom. All rights reserved.
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions
6 * are met:
7 * 1. Redistributions of source code must retain the above copyright
8 *    notice, this list of conditions, and the following disclaimer,
9 *    without modification.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 *    notice, this list of conditions and the following disclaimer in the
12 *    documentation and/or other materials provided with the distribution.
13 * 3. The names of the above-listed copyright holders may not be used
14 *    to endorse or promote products derived from this software without
15 *    specific prior written permission.
16 *
17 * ALTERNATIVELY, this software may be distributed under the terms of the
18 * GNU General Public License ("GPL") version 2, as published by the Free
19 * Software Foundation.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
22 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
23 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
25 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 */
33
34#include "vchiq_core.h"
35
36#define VCHIQ_SLOT_HANDLER_STACK 8192
37
38#define HANDLE_STATE_SHIFT 12
39
40#define SLOT_INFO_FROM_INDEX(state, index) (state->slot_info + (index))
41#define SLOT_DATA_FROM_INDEX(state, index) (state->slot_data + (index))
42#define SLOT_INDEX_FROM_DATA(state, data) \
43	(((unsigned int)((char *)data - (char *)state->slot_data)) / \
44	VCHIQ_SLOT_SIZE)
45#define SLOT_INDEX_FROM_INFO(state, info) \
46	((unsigned int)(info - state->slot_info))
47#define SLOT_QUEUE_INDEX_FROM_POS(pos) \
48	((int)((unsigned int)(pos) / VCHIQ_SLOT_SIZE))
49
50
51#define BULK_INDEX(x) (x & (VCHIQ_NUM_SERVICE_BULKS - 1))
52
53
54struct vchiq_open_payload {
55	int fourcc;
56	int client_id;
57	short version;
58	short version_min;
59};
60
61struct vchiq_openack_payload {
62	short version;
63};
64
65/* we require this for consistency between endpoints */
66vchiq_static_assert(sizeof(VCHIQ_HEADER_T) == 8);
67vchiq_static_assert(IS_POW2(sizeof(VCHIQ_HEADER_T)));
68vchiq_static_assert(IS_POW2(VCHIQ_NUM_CURRENT_BULKS));
69vchiq_static_assert(IS_POW2(VCHIQ_NUM_SERVICE_BULKS));
70vchiq_static_assert(IS_POW2(VCHIQ_MAX_SERVICES));
71vchiq_static_assert(VCHIQ_VERSION >= VCHIQ_VERSION_MIN);
72
73/* Run time control of log level, based on KERN_XXX level. */
74int vchiq_core_log_level = VCHIQ_LOG_DEFAULT;
75int vchiq_core_msg_log_level = VCHIQ_LOG_DEFAULT;
76int vchiq_sync_log_level = VCHIQ_LOG_DEFAULT;
77
78static atomic_t pause_bulks_count = ATOMIC_INIT(0);
79
80DEFINE_SPINLOCK(service_spinlock);
81DEFINE_SPINLOCK(bulk_waiter_spinlock);
82DEFINE_SPINLOCK(quota_spinlock);
83
84void
85vchiq_core_initialize(void)
86{
87	spin_lock_init(&service_spinlock);
88	spin_lock_init(&bulk_waiter_spinlock);
89	spin_lock_init(&quota_spinlock);
90}
91
92VCHIQ_STATE_T *vchiq_states[VCHIQ_MAX_STATES];
93static unsigned int handle_seq;
94
95static const char *const srvstate_names[] = {
96	"FREE",
97	"HIDDEN",
98	"LISTENING",
99	"OPENING",
100	"OPEN",
101	"OPENSYNC",
102	"CLOSESENT",
103	"CLOSERECVD",
104	"CLOSEWAIT",
105	"CLOSED"
106};
107
108static const char *const reason_names[] = {
109	"SERVICE_OPENED",
110	"SERVICE_CLOSED",
111	"MESSAGE_AVAILABLE",
112	"BULK_TRANSMIT_DONE",
113	"BULK_RECEIVE_DONE",
114	"BULK_TRANSMIT_ABORTED",
115	"BULK_RECEIVE_ABORTED"
116};
117
118static const char *const conn_state_names[] = {
119	"DISCONNECTED",
120	"CONNECTING",
121	"CONNECTED",
122	"PAUSING",
123	"PAUSE_SENT",
124	"PAUSED",
125	"RESUMING",
126	"PAUSE_TIMEOUT",
127	"RESUME_TIMEOUT"
128};
129
130
131static void
132release_message_sync(VCHIQ_STATE_T *state, VCHIQ_HEADER_T *header);
133
134static const char *msg_type_str(unsigned int msg_type)
135{
136	switch (msg_type) {
137	case VCHIQ_MSG_PADDING:       return "PADDING";
138	case VCHIQ_MSG_CONNECT:       return "CONNECT";
139	case VCHIQ_MSG_OPEN:          return "OPEN";
140	case VCHIQ_MSG_OPENACK:       return "OPENACK";
141	case VCHIQ_MSG_CLOSE:         return "CLOSE";
142	case VCHIQ_MSG_DATA:          return "DATA";
143	case VCHIQ_MSG_BULK_RX:       return "BULK_RX";
144	case VCHIQ_MSG_BULK_TX:       return "BULK_TX";
145	case VCHIQ_MSG_BULK_RX_DONE:  return "BULK_RX_DONE";
146	case VCHIQ_MSG_BULK_TX_DONE:  return "BULK_TX_DONE";
147	case VCHIQ_MSG_PAUSE:         return "PAUSE";
148	case VCHIQ_MSG_RESUME:        return "RESUME";
149	case VCHIQ_MSG_REMOTE_USE:    return "REMOTE_USE";
150	case VCHIQ_MSG_REMOTE_RELEASE:      return "REMOTE_RELEASE";
151	case VCHIQ_MSG_REMOTE_USE_ACTIVE:   return "REMOTE_USE_ACTIVE";
152	}
153	return "???";
154}
155
156static inline void
157vchiq_set_service_state(VCHIQ_SERVICE_T *service, int newstate)
158{
159	vchiq_log_info(vchiq_core_log_level, "%d: srv:%d %s->%s",
160		service->state->id, service->localport,
161		srvstate_names[service->srvstate],
162		srvstate_names[newstate]);
163	service->srvstate = newstate;
164}
165
166VCHIQ_SERVICE_T *
167find_service_by_handle(VCHIQ_SERVICE_HANDLE_T handle)
168{
169	VCHIQ_SERVICE_T *service;
170
171	spin_lock(&service_spinlock);
172	service = handle_to_service(handle);
173	if (service && (service->srvstate != VCHIQ_SRVSTATE_FREE) &&
174		(service->handle == handle)) {
175		BUG_ON(service->ref_count == 0);
176		service->ref_count++;
177	} else
178		service = NULL;
179	spin_unlock(&service_spinlock);
180
181	if (!service)
182		vchiq_log_info(vchiq_core_log_level,
183			"Invalid service handle 0x%x", handle);
184
185	return service;
186}
187
188VCHIQ_SERVICE_T *
189find_service_by_port(VCHIQ_STATE_T *state, int localport)
190{
191	VCHIQ_SERVICE_T *service = NULL;
192	if ((unsigned int)localport <= VCHIQ_PORT_MAX) {
193		spin_lock(&service_spinlock);
194		service = state->services[localport];
195		if (service && (service->srvstate != VCHIQ_SRVSTATE_FREE)) {
196			BUG_ON(service->ref_count == 0);
197			service->ref_count++;
198		} else
199			service = NULL;
200		spin_unlock(&service_spinlock);
201	}
202
203	if (!service)
204		vchiq_log_info(vchiq_core_log_level,
205			"Invalid port %d", localport);
206
207	return service;
208}
209
210VCHIQ_SERVICE_T *
211find_service_for_instance(VCHIQ_INSTANCE_T instance,
212	VCHIQ_SERVICE_HANDLE_T handle) {
213	VCHIQ_SERVICE_T *service;
214
215	spin_lock(&service_spinlock);
216	service = handle_to_service(handle);
217	if (service && (service->srvstate != VCHIQ_SRVSTATE_FREE) &&
218		(service->handle == handle) &&
219		(service->instance == instance)) {
220		BUG_ON(service->ref_count == 0);
221		service->ref_count++;
222	} else
223		service = NULL;
224	spin_unlock(&service_spinlock);
225
226	if (!service)
227		vchiq_log_info(vchiq_core_log_level,
228			"Invalid service handle 0x%x", handle);
229
230	return service;
231}
232
233VCHIQ_SERVICE_T *
234next_service_by_instance(VCHIQ_STATE_T *state, VCHIQ_INSTANCE_T instance,
235	int *pidx)
236{
237	VCHIQ_SERVICE_T *service = NULL;
238	int idx = *pidx;
239
240	spin_lock(&service_spinlock);
241	while (idx < state->unused_service) {
242		VCHIQ_SERVICE_T *srv = state->services[idx++];
243		if (srv && (srv->srvstate != VCHIQ_SRVSTATE_FREE) &&
244			(srv->instance == instance)) {
245			service = srv;
246			BUG_ON(service->ref_count == 0);
247			service->ref_count++;
248			break;
249		}
250	}
251	spin_unlock(&service_spinlock);
252
253	*pidx = idx;
254
255	return service;
256}
257
258void
259lock_service(VCHIQ_SERVICE_T *service)
260{
261	spin_lock(&service_spinlock);
262	BUG_ON(!service || (service->ref_count == 0));
263	if (service)
264		service->ref_count++;
265	spin_unlock(&service_spinlock);
266}
267
268void
269unlock_service(VCHIQ_SERVICE_T *service)
270{
271	VCHIQ_STATE_T *state = service->state;
272	spin_lock(&service_spinlock);
273	BUG_ON(!service || (service->ref_count == 0));
274	if (service && service->ref_count) {
275		service->ref_count--;
276		if (!service->ref_count) {
277			BUG_ON(service->srvstate != VCHIQ_SRVSTATE_FREE);
278			state->services[service->localport] = NULL;
279		} else
280			service = NULL;
281	}
282	spin_unlock(&service_spinlock);
283
284	kfree(service);
285}
286
287int
288vchiq_get_client_id(VCHIQ_SERVICE_HANDLE_T handle)
289{
290	VCHIQ_SERVICE_T *service = find_service_by_handle(handle);
291	int id;
292
293	id = service ? service->client_id : 0;
294	if (service)
295		unlock_service(service);
296
297	return id;
298}
299
300void *
301vchiq_get_service_userdata(VCHIQ_SERVICE_HANDLE_T handle)
302{
303	VCHIQ_SERVICE_T *service = handle_to_service(handle);
304
305	return service ? service->base.userdata : NULL;
306}
307
308int
309vchiq_get_service_fourcc(VCHIQ_SERVICE_HANDLE_T handle)
310{
311	VCHIQ_SERVICE_T *service = handle_to_service(handle);
312
313	return service ? service->base.fourcc : 0;
314}
315
316static void
317mark_service_closing_internal(VCHIQ_SERVICE_T *service, int sh_thread)
318{
319	VCHIQ_STATE_T *state = service->state;
320	VCHIQ_SERVICE_QUOTA_T *service_quota;
321
322	service->closing = 1;
323
324	/* Synchronise with other threads. */
325	lmutex_lock(&state->recycle_mutex);
326	lmutex_unlock(&state->recycle_mutex);
327	if (!sh_thread || (state->conn_state != VCHIQ_CONNSTATE_PAUSE_SENT)) {
328		/* If we're pausing then the slot_mutex is held until resume
329		 * by the slot handler.  Therefore don't try to acquire this
330		 * mutex if we're the slot handler and in the pause sent state.
331		 * We don't need to in this case anyway. */
332		lmutex_lock(&state->slot_mutex);
333		lmutex_unlock(&state->slot_mutex);
334	}
335
336	/* Unblock any sending thread. */
337	service_quota = &state->service_quotas[service->localport];
338	up(&service_quota->quota_event);
339}
340
341static void
342mark_service_closing(VCHIQ_SERVICE_T *service)
343{
344	mark_service_closing_internal(service, 0);
345}
346
347static inline VCHIQ_STATUS_T
348make_service_callback(VCHIQ_SERVICE_T *service, VCHIQ_REASON_T reason,
349	VCHIQ_HEADER_T *header, void *bulk_userdata)
350{
351	VCHIQ_STATUS_T status;
352	vchiq_log_trace(vchiq_core_log_level, "%d: callback:%d (%s, %x, %x)",
353		service->state->id, service->localport, reason_names[reason],
354		(unsigned int)header, (unsigned int)bulk_userdata);
355	status = service->base.callback(reason, header, service->handle,
356		bulk_userdata);
357	if (status == VCHIQ_ERROR) {
358		vchiq_log_warning(vchiq_core_log_level,
359			"%d: ignoring ERROR from callback to service %x",
360			service->state->id, service->handle);
361		status = VCHIQ_SUCCESS;
362	}
363	return status;
364}
365
366inline void
367vchiq_set_conn_state(VCHIQ_STATE_T *state, VCHIQ_CONNSTATE_T newstate)
368{
369	VCHIQ_CONNSTATE_T oldstate = state->conn_state;
370	vchiq_log_info(vchiq_core_log_level, "%d: %s->%s", state->id,
371		conn_state_names[oldstate],
372		conn_state_names[newstate]);
373	state->conn_state = newstate;
374	vchiq_platform_conn_state_changed(state, oldstate, newstate);
375}
376
377static inline void
378remote_event_create(REMOTE_EVENT_T *event)
379{
380	event->armed = 0;
381	/* Don't clear the 'fired' flag because it may already have been set
382	** by the other side. */
383	event->event->value = 0;
384}
385
386static inline void
387remote_event_destroy(REMOTE_EVENT_T *event)
388{
389	(void)event;
390}
391
392static inline int
393remote_event_wait(REMOTE_EVENT_T *event)
394{
395	if (!event->fired) {
396		event->armed = 1;
397		dsb();
398		if (!event->fired) {
399			if (down_interruptible(event->event) != 0) {
400				event->armed = 0;
401				return 0;
402			}
403		}
404		event->armed = 0;
405		wmb();
406	}
407
408	event->fired = 0;
409	return 1;
410}
411
412static inline void
413remote_event_signal_local(REMOTE_EVENT_T *event)
414{
415	event->armed = 0;
416	up(event->event);
417}
418
419static inline void
420remote_event_poll(REMOTE_EVENT_T *event)
421{
422	if (event->fired && event->armed)
423		remote_event_signal_local(event);
424}
425
426void
427remote_event_pollall(VCHIQ_STATE_T *state)
428{
429	remote_event_poll(&state->local->sync_trigger);
430	remote_event_poll(&state->local->sync_release);
431	remote_event_poll(&state->local->trigger);
432	remote_event_poll(&state->local->recycle);
433}
434
435/* Round up message sizes so that any space at the end of a slot is always big
436** enough for a header. This relies on header size being a power of two, which
437** has been verified earlier by a static assertion. */
438
439static inline unsigned int
440calc_stride(unsigned int size)
441{
442	/* Allow room for the header */
443	size += sizeof(VCHIQ_HEADER_T);
444
445	/* Round up */
446	return (size + sizeof(VCHIQ_HEADER_T) - 1) & ~(sizeof(VCHIQ_HEADER_T)
447		- 1);
448}
449
450/* Called by the slot handler thread */
451static VCHIQ_SERVICE_T *
452get_listening_service(VCHIQ_STATE_T *state, int fourcc)
453{
454	int i;
455
456	WARN_ON(fourcc == VCHIQ_FOURCC_INVALID);
457
458	for (i = 0; i < state->unused_service; i++) {
459		VCHIQ_SERVICE_T *service = state->services[i];
460		if (service &&
461			(service->public_fourcc == fourcc) &&
462			((service->srvstate == VCHIQ_SRVSTATE_LISTENING) ||
463			((service->srvstate == VCHIQ_SRVSTATE_OPEN) &&
464			(service->remoteport == VCHIQ_PORT_FREE)))) {
465			lock_service(service);
466			return service;
467		}
468	}
469
470	return NULL;
471}
472
473/* Called by the slot handler thread */
474static VCHIQ_SERVICE_T *
475get_connected_service(VCHIQ_STATE_T *state, unsigned int port)
476{
477	int i;
478	for (i = 0; i < state->unused_service; i++) {
479		VCHIQ_SERVICE_T *service = state->services[i];
480		if (service && (service->srvstate == VCHIQ_SRVSTATE_OPEN)
481			&& (service->remoteport == port)) {
482			lock_service(service);
483			return service;
484		}
485	}
486	return NULL;
487}
488
489inline void
490request_poll(VCHIQ_STATE_T *state, VCHIQ_SERVICE_T *service, int poll_type)
491{
492	uint32_t value;
493
494	if (service) {
495		do {
496			value = atomic_read(&service->poll_flags);
497		} while (atomic_cmpxchg(&service->poll_flags, value,
498			value | (1 << poll_type)) != value);
499
500		do {
501			value = atomic_read(&state->poll_services[
502				service->localport>>5]);
503		} while (atomic_cmpxchg(
504			&state->poll_services[service->localport>>5],
505			value, value | (1 << (service->localport & 0x1f)))
506			!= value);
507	}
508
509	state->poll_needed = 1;
510	wmb();
511
512	/* ... and ensure the slot handler runs. */
513	remote_event_signal_local(&state->local->trigger);
514}
515
516/* Called from queue_message, by the slot handler and application threads,
517** with slot_mutex held */
518static VCHIQ_HEADER_T *
519reserve_space(VCHIQ_STATE_T *state, int space, int is_blocking)
520{
521	VCHIQ_SHARED_STATE_T *local = state->local;
522	int tx_pos = state->local_tx_pos;
523	int slot_space = VCHIQ_SLOT_SIZE - (tx_pos & VCHIQ_SLOT_MASK);
524
525	if (space > slot_space) {
526		VCHIQ_HEADER_T *header;
527		/* Fill the remaining space with padding */
528		WARN_ON(state->tx_data == NULL);
529		header = (VCHIQ_HEADER_T *)
530			(state->tx_data + (tx_pos & VCHIQ_SLOT_MASK));
531		header->msgid = VCHIQ_MSGID_PADDING;
532		header->size = slot_space - sizeof(VCHIQ_HEADER_T);
533
534		tx_pos += slot_space;
535	}
536
537	/* If necessary, get the next slot. */
538	if ((tx_pos & VCHIQ_SLOT_MASK) == 0) {
539		int slot_index;
540
541		/* If there is no free slot... */
542
543		if (down_trylock(&state->slot_available_event) != 0) {
544			/* ...wait for one. */
545
546			VCHIQ_STATS_INC(state, slot_stalls);
547
548			/* But first, flush through the last slot. */
549			state->local_tx_pos = tx_pos;
550			local->tx_pos = tx_pos;
551			remote_event_signal(&state->remote->trigger);
552
553			if (!is_blocking ||
554				(down_interruptible(
555				&state->slot_available_event) != 0))
556				return NULL; /* No space available */
557		}
558
559		BUG_ON(tx_pos ==
560			(state->slot_queue_available * VCHIQ_SLOT_SIZE));
561
562		slot_index = local->slot_queue[
563			SLOT_QUEUE_INDEX_FROM_POS(tx_pos) &
564			VCHIQ_SLOT_QUEUE_MASK];
565		state->tx_data =
566			(char *)SLOT_DATA_FROM_INDEX(state, slot_index);
567	}
568
569	state->local_tx_pos = tx_pos + space;
570
571	return (VCHIQ_HEADER_T *)(state->tx_data + (tx_pos & VCHIQ_SLOT_MASK));
572}
573
574/* Called by the recycle thread. */
575static void
576process_free_queue(VCHIQ_STATE_T *state)
577{
578	VCHIQ_SHARED_STATE_T *local = state->local;
579	BITSET_T service_found[BITSET_SIZE(VCHIQ_MAX_SERVICES)];
580	int slot_queue_available;
581
582	/* Use a read memory barrier to ensure that any state that may have
583	** been modified by another thread is not masked by stale prefetched
584	** values. */
585	rmb();
586
587	/* Find slots which have been freed by the other side, and return them
588	** to the available queue. */
589	slot_queue_available = state->slot_queue_available;
590
591	while (slot_queue_available != local->slot_queue_recycle) {
592		unsigned int pos;
593		int slot_index = local->slot_queue[slot_queue_available++ &
594			VCHIQ_SLOT_QUEUE_MASK];
595		char *data = (char *)SLOT_DATA_FROM_INDEX(state, slot_index);
596		int data_found = 0;
597
598		vchiq_log_trace(vchiq_core_log_level, "%d: pfq %d=%x %x %x",
599			state->id, slot_index, (unsigned int)data,
600			local->slot_queue_recycle, slot_queue_available);
601
602		/* Initialise the bitmask for services which have used this
603		** slot */
604		BITSET_ZERO(service_found);
605
606		pos = 0;
607
608		while (pos < VCHIQ_SLOT_SIZE) {
609			VCHIQ_HEADER_T *header =
610				(VCHIQ_HEADER_T *)(data + pos);
611			int msgid = header->msgid;
612			if (VCHIQ_MSG_TYPE(msgid) == VCHIQ_MSG_DATA) {
613				int port = VCHIQ_MSG_SRCPORT(msgid);
614				VCHIQ_SERVICE_QUOTA_T *service_quota =
615					&state->service_quotas[port];
616				int count;
617				spin_lock(&quota_spinlock);
618				count = service_quota->message_use_count;
619				if (count > 0)
620					service_quota->message_use_count =
621						count - 1;
622				spin_unlock(&quota_spinlock);
623
624				if (count == service_quota->message_quota)
625					/* Signal the service that it
626					** has dropped below its quota
627					*/
628					up(&service_quota->quota_event);
629				else if (count == 0) {
630					vchiq_log_error(vchiq_core_log_level,
631						"service %d "
632						"message_use_count=%d "
633						"(header %x, msgid %x, "
634						"header->msgid %x, "
635						"header->size %x)",
636						port,
637						service_quota->
638							message_use_count,
639						(unsigned int)header, msgid,
640						header->msgid,
641						header->size);
642					WARN(1, "invalid message use count\n");
643				}
644				if (!BITSET_IS_SET(service_found, port)) {
645					/* Set the found bit for this service */
646					BITSET_SET(service_found, port);
647
648					spin_lock(&quota_spinlock);
649					count = service_quota->slot_use_count;
650					if (count > 0)
651						service_quota->slot_use_count =
652							count - 1;
653					spin_unlock(&quota_spinlock);
654
655					if (count > 0) {
656						/* Signal the service in case
657						** it has dropped below its
658						** quota */
659						up(&service_quota->quota_event);
660						vchiq_log_trace(
661							vchiq_core_log_level,
662							"%d: pfq:%d %x@%x - "
663							"slot_use->%d",
664							state->id, port,
665							header->size,
666							(unsigned int)header,
667							count - 1);
668					} else {
669						vchiq_log_error(
670							vchiq_core_log_level,
671								"service %d "
672								"slot_use_count"
673								"=%d (header %x"
674								", msgid %x, "
675								"header->msgid"
676								" %x, header->"
677								"size %x)",
678							port, count,
679							(unsigned int)header,
680							msgid,
681							header->msgid,
682							header->size);
683						WARN(1, "bad slot use count\n");
684					}
685				}
686
687				data_found = 1;
688			}
689
690			pos += calc_stride(header->size);
691			if (pos > VCHIQ_SLOT_SIZE) {
692				vchiq_log_error(vchiq_core_log_level,
693					"pfq - pos %x: header %x, msgid %x, "
694					"header->msgid %x, header->size %x",
695					pos, (unsigned int)header, msgid,
696					header->msgid, header->size);
697				WARN(1, "invalid slot position\n");
698			}
699		}
700
701		if (data_found) {
702			int count;
703			spin_lock(&quota_spinlock);
704			count = state->data_use_count;
705			if (count > 0)
706				state->data_use_count =
707					count - 1;
708			spin_unlock(&quota_spinlock);
709			if (count == state->data_quota)
710				up(&state->data_quota_event);
711		}
712
713		state->slot_queue_available = slot_queue_available;
714		up(&state->slot_available_event);
715	}
716}
717
718/* Called by the slot handler and application threads */
719static VCHIQ_STATUS_T
720queue_message(VCHIQ_STATE_T *state, VCHIQ_SERVICE_T *service,
721	int msgid, const VCHIQ_ELEMENT_T *elements,
722	int count, int size, int is_blocking)
723{
724	VCHIQ_SHARED_STATE_T *local;
725	VCHIQ_SERVICE_QUOTA_T *service_quota = NULL;
726	VCHIQ_HEADER_T *header;
727	int type = VCHIQ_MSG_TYPE(msgid);
728
729	unsigned int stride;
730
731	local = state->local;
732
733	stride = calc_stride(size);
734
735	WARN_ON(!(stride <= VCHIQ_SLOT_SIZE));
736
737	if ((type != VCHIQ_MSG_RESUME) &&
738		(lmutex_lock_interruptible(&state->slot_mutex) != 0))
739		return VCHIQ_RETRY;
740
741	if (type == VCHIQ_MSG_DATA) {
742		int tx_end_index;
743
744		BUG_ON(!service);
745
746		if (service->closing) {
747			/* The service has been closed */
748			lmutex_unlock(&state->slot_mutex);
749			return VCHIQ_ERROR;
750		}
751
752		service_quota = &state->service_quotas[service->localport];
753
754		spin_lock(&quota_spinlock);
755
756		/* Ensure this service doesn't use more than its quota of
757		** messages or slots */
758		tx_end_index = SLOT_QUEUE_INDEX_FROM_POS(
759			state->local_tx_pos + stride - 1);
760
761		/* Ensure data messages don't use more than their quota of
762		** slots */
763		while ((tx_end_index != state->previous_data_index) &&
764			(state->data_use_count == state->data_quota)) {
765			VCHIQ_STATS_INC(state, data_stalls);
766			spin_unlock(&quota_spinlock);
767			lmutex_unlock(&state->slot_mutex);
768
769			if (down_interruptible(&state->data_quota_event)
770				!= 0)
771				return VCHIQ_RETRY;
772
773			lmutex_lock(&state->slot_mutex);
774			spin_lock(&quota_spinlock);
775			tx_end_index = SLOT_QUEUE_INDEX_FROM_POS(
776				state->local_tx_pos + stride - 1);
777			if ((tx_end_index == state->previous_data_index) ||
778				(state->data_use_count < state->data_quota)) {
779				/* Pass the signal on to other waiters */
780				up(&state->data_quota_event);
781				break;
782			}
783		}
784
785		while ((service_quota->message_use_count ==
786				service_quota->message_quota) ||
787			((tx_end_index != service_quota->previous_tx_index) &&
788			(service_quota->slot_use_count ==
789				service_quota->slot_quota))) {
790			spin_unlock(&quota_spinlock);
791			vchiq_log_trace(vchiq_core_log_level,
792				"%d: qm:%d %s,%x - quota stall "
793				"(msg %d, slot %d)",
794				state->id, service->localport,
795				msg_type_str(type), size,
796				service_quota->message_use_count,
797				service_quota->slot_use_count);
798			VCHIQ_SERVICE_STATS_INC(service, quota_stalls);
799			lmutex_unlock(&state->slot_mutex);
800			if (down_interruptible(&service_quota->quota_event)
801				!= 0)
802				return VCHIQ_RETRY;
803			if (service->closing)
804				return VCHIQ_ERROR;
805			if (lmutex_lock_interruptible(&state->slot_mutex) != 0)
806				return VCHIQ_RETRY;
807			if (service->srvstate != VCHIQ_SRVSTATE_OPEN) {
808				/* The service has been closed */
809				lmutex_unlock(&state->slot_mutex);
810				return VCHIQ_ERROR;
811			}
812			spin_lock(&quota_spinlock);
813			tx_end_index = SLOT_QUEUE_INDEX_FROM_POS(
814				state->local_tx_pos + stride - 1);
815		}
816
817		spin_unlock(&quota_spinlock);
818	}
819
820	header = reserve_space(state, stride, is_blocking);
821
822	if (!header) {
823		if (service)
824			VCHIQ_SERVICE_STATS_INC(service, slot_stalls);
825		lmutex_unlock(&state->slot_mutex);
826		return VCHIQ_RETRY;
827	}
828
829	if (type == VCHIQ_MSG_DATA) {
830		int i, pos;
831		int tx_end_index;
832		int slot_use_count;
833
834		vchiq_log_info(vchiq_core_log_level,
835			"%d: qm %s@%x,%x (%d->%d)",
836			state->id,
837			msg_type_str(VCHIQ_MSG_TYPE(msgid)),
838			(unsigned int)header, size,
839			VCHIQ_MSG_SRCPORT(msgid),
840			VCHIQ_MSG_DSTPORT(msgid));
841
842		BUG_ON(!service);
843
844		for (i = 0, pos = 0; i < (unsigned int)count;
845			pos += elements[i++].size)
846			if (elements[i].size) {
847				if (vchiq_copy_from_user
848					(header->data + pos, elements[i].data,
849					(size_t) elements[i].size) !=
850					VCHIQ_SUCCESS) {
851					lmutex_unlock(&state->slot_mutex);
852					VCHIQ_SERVICE_STATS_INC(service,
853						error_count);
854					return VCHIQ_ERROR;
855				}
856				if (i == 0) {
857					if (vchiq_core_msg_log_level >=
858						VCHIQ_LOG_INFO)
859						vchiq_log_dump_mem("Sent", 0,
860							header->data + pos,
861							min(64,
862							elements[0].size));
863				}
864			}
865
866		spin_lock(&quota_spinlock);
867		service_quota->message_use_count++;
868
869		tx_end_index =
870			SLOT_QUEUE_INDEX_FROM_POS(state->local_tx_pos - 1);
871
872		/* If this transmission can't fit in the last slot used by any
873		** service, the data_use_count must be increased. */
874		if (tx_end_index != state->previous_data_index) {
875			state->previous_data_index = tx_end_index;
876			state->data_use_count++;
877		}
878
879		/* If this isn't the same slot last used by this service,
880		** the service's slot_use_count must be increased. */
881		if (tx_end_index != service_quota->previous_tx_index) {
882			service_quota->previous_tx_index = tx_end_index;
883			slot_use_count = ++service_quota->slot_use_count;
884		} else {
885			slot_use_count = 0;
886		}
887
888		spin_unlock(&quota_spinlock);
889
890		if (slot_use_count)
891			vchiq_log_trace(vchiq_core_log_level,
892				"%d: qm:%d %s,%x - slot_use->%d (hdr %p)",
893				state->id, service->localport,
894				msg_type_str(VCHIQ_MSG_TYPE(msgid)), size,
895				slot_use_count, header);
896
897		VCHIQ_SERVICE_STATS_INC(service, ctrl_tx_count);
898		VCHIQ_SERVICE_STATS_ADD(service, ctrl_tx_bytes, size);
899	} else {
900		vchiq_log_info(vchiq_core_log_level,
901			"%d: qm %s@%x,%x (%d->%d)", state->id,
902			msg_type_str(VCHIQ_MSG_TYPE(msgid)),
903			(unsigned int)header, size,
904			VCHIQ_MSG_SRCPORT(msgid),
905			VCHIQ_MSG_DSTPORT(msgid));
906		if (size != 0) {
907			WARN_ON(!((count == 1) && (size == elements[0].size)));
908			memcpy(header->data, elements[0].data,
909				elements[0].size);
910		}
911		VCHIQ_STATS_INC(state, ctrl_tx_count);
912	}
913
914	header->msgid = msgid;
915	header->size = size;
916
917	{
918		int svc_fourcc;
919
920		svc_fourcc = service
921			? service->base.fourcc
922			: VCHIQ_MAKE_FOURCC('?', '?', '?', '?');
923
924		vchiq_log_info(vchiq_core_msg_log_level,
925			"Sent Msg %s(%u) to %c%c%c%c s:%u d:%d len:%d",
926			msg_type_str(VCHIQ_MSG_TYPE(msgid)),
927			VCHIQ_MSG_TYPE(msgid),
928			VCHIQ_FOURCC_AS_4CHARS(svc_fourcc),
929			VCHIQ_MSG_SRCPORT(msgid),
930			VCHIQ_MSG_DSTPORT(msgid),
931			size);
932	}
933
934	/* Make sure the new header is visible to the peer. */
935	wmb();
936
937	/* Make the new tx_pos visible to the peer. */
938	local->tx_pos = state->local_tx_pos;
939	wmb();
940
941	if (service && (type == VCHIQ_MSG_CLOSE))
942		vchiq_set_service_state(service, VCHIQ_SRVSTATE_CLOSESENT);
943
944	if (VCHIQ_MSG_TYPE(msgid) != VCHIQ_MSG_PAUSE)
945		lmutex_unlock(&state->slot_mutex);
946
947	remote_event_signal(&state->remote->trigger);
948
949	return VCHIQ_SUCCESS;
950}
951
952/* Called by the slot handler and application threads */
953static VCHIQ_STATUS_T
954queue_message_sync(VCHIQ_STATE_T *state, VCHIQ_SERVICE_T *service,
955	int msgid, const VCHIQ_ELEMENT_T *elements,
956	int count, int size, int is_blocking)
957{
958	VCHIQ_SHARED_STATE_T *local;
959	VCHIQ_HEADER_T *header;
960
961	local = state->local;
962
963	if ((VCHIQ_MSG_TYPE(msgid) != VCHIQ_MSG_RESUME) &&
964		(lmutex_lock_interruptible(&state->sync_mutex) != 0))
965		return VCHIQ_RETRY;
966
967	remote_event_wait(&local->sync_release);
968
969	rmb();
970
971	header = (VCHIQ_HEADER_T *)SLOT_DATA_FROM_INDEX(state,
972		local->slot_sync);
973
974	{
975		int oldmsgid = header->msgid;
976		if (oldmsgid != VCHIQ_MSGID_PADDING)
977			vchiq_log_error(vchiq_core_log_level,
978				"%d: qms - msgid %x, not PADDING",
979				state->id, oldmsgid);
980	}
981
982	if (service) {
983		int i, pos;
984
985		vchiq_log_info(vchiq_sync_log_level,
986			"%d: qms %s@%x,%x (%d->%d)", state->id,
987			msg_type_str(VCHIQ_MSG_TYPE(msgid)),
988			(unsigned int)header, size,
989			VCHIQ_MSG_SRCPORT(msgid),
990			VCHIQ_MSG_DSTPORT(msgid));
991
992		for (i = 0, pos = 0; i < (unsigned int)count;
993			pos += elements[i++].size)
994			if (elements[i].size) {
995				if (vchiq_copy_from_user
996					(header->data + pos, elements[i].data,
997					(size_t) elements[i].size) !=
998					VCHIQ_SUCCESS) {
999					lmutex_unlock(&state->sync_mutex);
1000					VCHIQ_SERVICE_STATS_INC(service,
1001						error_count);
1002					return VCHIQ_ERROR;
1003				}
1004				if (i == 0) {
1005					if (vchiq_sync_log_level >=
1006						VCHIQ_LOG_TRACE)
1007						vchiq_log_dump_mem("Sent Sync",
1008							0, header->data + pos,
1009							min(64,
1010							elements[0].size));
1011				}
1012			}
1013
1014		VCHIQ_SERVICE_STATS_INC(service, ctrl_tx_count);
1015		VCHIQ_SERVICE_STATS_ADD(service, ctrl_tx_bytes, size);
1016	} else {
1017		vchiq_log_info(vchiq_sync_log_level,
1018			"%d: qms %s@%x,%x (%d->%d)", state->id,
1019			msg_type_str(VCHIQ_MSG_TYPE(msgid)),
1020			(unsigned int)header, size,
1021			VCHIQ_MSG_SRCPORT(msgid),
1022			VCHIQ_MSG_DSTPORT(msgid));
1023		if (size != 0) {
1024			WARN_ON(!((count == 1) && (size == elements[0].size)));
1025			memcpy(header->data, elements[0].data,
1026				elements[0].size);
1027		}
1028		VCHIQ_STATS_INC(state, ctrl_tx_count);
1029	}
1030
1031	header->size = size;
1032	header->msgid = msgid;
1033
1034	if (vchiq_sync_log_level >= VCHIQ_LOG_TRACE) {
1035		int svc_fourcc;
1036
1037		svc_fourcc = service
1038			? service->base.fourcc
1039			: VCHIQ_MAKE_FOURCC('?', '?', '?', '?');
1040
1041		vchiq_log_trace(vchiq_sync_log_level,
1042			"Sent Sync Msg %s(%u) to %c%c%c%c s:%u d:%d len:%d",
1043			msg_type_str(VCHIQ_MSG_TYPE(msgid)),
1044			VCHIQ_MSG_TYPE(msgid),
1045			VCHIQ_FOURCC_AS_4CHARS(svc_fourcc),
1046			VCHIQ_MSG_SRCPORT(msgid),
1047			VCHIQ_MSG_DSTPORT(msgid),
1048			size);
1049	}
1050
1051	/* Make sure the new header is visible to the peer. */
1052	wmb();
1053
1054	remote_event_signal(&state->remote->sync_trigger);
1055
1056	if (VCHIQ_MSG_TYPE(msgid) != VCHIQ_MSG_PAUSE)
1057		lmutex_unlock(&state->sync_mutex);
1058
1059	return VCHIQ_SUCCESS;
1060}
1061
1062static inline void
1063claim_slot(VCHIQ_SLOT_INFO_T *slot)
1064{
1065	slot->use_count++;
1066}
1067
1068static void
1069release_slot(VCHIQ_STATE_T *state, VCHIQ_SLOT_INFO_T *slot_info,
1070	VCHIQ_HEADER_T *header, VCHIQ_SERVICE_T *service)
1071{
1072	int release_count;
1073
1074	lmutex_lock(&state->recycle_mutex);
1075
1076	if (header) {
1077		int msgid = header->msgid;
1078		if (((msgid & VCHIQ_MSGID_CLAIMED) == 0) ||
1079			(service && service->closing)) {
1080			lmutex_unlock(&state->recycle_mutex);
1081			return;
1082		}
1083
1084		/* Rewrite the message header to prevent a double
1085		** release */
1086		header->msgid = msgid & ~VCHIQ_MSGID_CLAIMED;
1087	}
1088
1089	release_count = slot_info->release_count;
1090	slot_info->release_count = ++release_count;
1091
1092	if (release_count == slot_info->use_count) {
1093		int slot_queue_recycle;
1094		/* Add to the freed queue */
1095
1096		/* A read barrier is necessary here to prevent speculative
1097		** fetches of remote->slot_queue_recycle from overtaking the
1098		** mutex. */
1099		rmb();
1100
1101		slot_queue_recycle = state->remote->slot_queue_recycle;
1102		state->remote->slot_queue[slot_queue_recycle &
1103			VCHIQ_SLOT_QUEUE_MASK] =
1104			SLOT_INDEX_FROM_INFO(state, slot_info);
1105		state->remote->slot_queue_recycle = slot_queue_recycle + 1;
1106		vchiq_log_info(vchiq_core_log_level,
1107			"%d: release_slot %d - recycle->%x",
1108			state->id, SLOT_INDEX_FROM_INFO(state, slot_info),
1109			state->remote->slot_queue_recycle);
1110
1111		/* A write barrier is necessary, but remote_event_signal
1112		** contains one. */
1113		remote_event_signal(&state->remote->recycle);
1114	}
1115
1116	lmutex_unlock(&state->recycle_mutex);
1117}
1118
1119/* Called by the slot handler - don't hold the bulk mutex */
1120static VCHIQ_STATUS_T
1121notify_bulks(VCHIQ_SERVICE_T *service, VCHIQ_BULK_QUEUE_T *queue,
1122	int retry_poll)
1123{
1124	VCHIQ_STATUS_T status = VCHIQ_SUCCESS;
1125
1126	vchiq_log_trace(vchiq_core_log_level,
1127		"%d: nb:%d %cx - p=%x rn=%x r=%x",
1128		service->state->id, service->localport,
1129		(queue == &service->bulk_tx) ? 't' : 'r',
1130		queue->process, queue->remote_notify, queue->remove);
1131
1132	if (service->state->is_master) {
1133		while (queue->remote_notify != queue->process) {
1134			VCHIQ_BULK_T *bulk =
1135				&queue->bulks[BULK_INDEX(queue->remote_notify)];
1136			int msgtype = (bulk->dir == VCHIQ_BULK_TRANSMIT) ?
1137				VCHIQ_MSG_BULK_RX_DONE : VCHIQ_MSG_BULK_TX_DONE;
1138			int msgid = VCHIQ_MAKE_MSG(msgtype, service->localport,
1139				service->remoteport);
1140			VCHIQ_ELEMENT_T element = { &bulk->actual, 4 };
1141			/* Only reply to non-dummy bulk requests */
1142			if (bulk->remote_data) {
1143				status = queue_message(service->state, NULL,
1144					msgid, &element, 1, 4, 0);
1145				if (status != VCHIQ_SUCCESS)
1146					break;
1147			}
1148			queue->remote_notify++;
1149		}
1150	} else {
1151		queue->remote_notify = queue->process;
1152	}
1153
1154	if (status == VCHIQ_SUCCESS) {
1155		while (queue->remove != queue->remote_notify) {
1156			VCHIQ_BULK_T *bulk =
1157				&queue->bulks[BULK_INDEX(queue->remove)];
1158
1159			/* Only generate callbacks for non-dummy bulk
1160			** requests, and non-terminated services */
1161			if (bulk->data && service->instance) {
1162				if (bulk->actual != VCHIQ_BULK_ACTUAL_ABORTED) {
1163					if (bulk->dir == VCHIQ_BULK_TRANSMIT) {
1164						VCHIQ_SERVICE_STATS_INC(service,
1165							bulk_tx_count);
1166						VCHIQ_SERVICE_STATS_ADD(service,
1167							bulk_tx_bytes,
1168							bulk->actual);
1169					} else {
1170						VCHIQ_SERVICE_STATS_INC(service,
1171							bulk_rx_count);
1172						VCHIQ_SERVICE_STATS_ADD(service,
1173							bulk_rx_bytes,
1174							bulk->actual);
1175					}
1176				} else {
1177					VCHIQ_SERVICE_STATS_INC(service,
1178						bulk_aborted_count);
1179				}
1180				if (bulk->mode == VCHIQ_BULK_MODE_BLOCKING) {
1181					struct bulk_waiter *waiter;
1182					spin_lock(&bulk_waiter_spinlock);
1183					waiter = bulk->userdata;
1184					if (waiter) {
1185						waiter->actual = bulk->actual;
1186						up(&waiter->event);
1187					}
1188					spin_unlock(&bulk_waiter_spinlock);
1189				} else if (bulk->mode ==
1190					VCHIQ_BULK_MODE_CALLBACK) {
1191					VCHIQ_REASON_T reason = (bulk->dir ==
1192						VCHIQ_BULK_TRANSMIT) ?
1193						((bulk->actual ==
1194						VCHIQ_BULK_ACTUAL_ABORTED) ?
1195						VCHIQ_BULK_TRANSMIT_ABORTED :
1196						VCHIQ_BULK_TRANSMIT_DONE) :
1197						((bulk->actual ==
1198						VCHIQ_BULK_ACTUAL_ABORTED) ?
1199						VCHIQ_BULK_RECEIVE_ABORTED :
1200						VCHIQ_BULK_RECEIVE_DONE);
1201					status = make_service_callback(service,
1202						reason,	NULL, bulk->userdata);
1203					if (status == VCHIQ_RETRY)
1204						break;
1205				}
1206			}
1207
1208			queue->remove++;
1209			up(&service->bulk_remove_event);
1210		}
1211		if (!retry_poll)
1212			status = VCHIQ_SUCCESS;
1213	}
1214
1215	if (status == VCHIQ_RETRY)
1216		request_poll(service->state, service,
1217			(queue == &service->bulk_tx) ?
1218			VCHIQ_POLL_TXNOTIFY : VCHIQ_POLL_RXNOTIFY);
1219
1220	return status;
1221}
1222
1223/* Called by the slot handler thread */
1224static void
1225poll_services(VCHIQ_STATE_T *state)
1226{
1227	int group, i;
1228
1229	for (group = 0; group < BITSET_SIZE(state->unused_service); group++) {
1230		uint32_t flags;
1231		flags = atomic_xchg(&state->poll_services[group], 0);
1232		for (i = 0; flags; i++) {
1233			if (flags & (1 << i)) {
1234				VCHIQ_SERVICE_T *service =
1235					find_service_by_port(state,
1236						(group<<5) + i);
1237				uint32_t service_flags;
1238				flags &= ~(1 << i);
1239				if (!service)
1240					continue;
1241				service_flags =
1242					atomic_xchg(&service->poll_flags, 0);
1243				if (service_flags &
1244					(1 << VCHIQ_POLL_REMOVE)) {
1245					vchiq_log_info(vchiq_core_log_level,
1246						"%d: ps - remove %d<->%d",
1247						state->id, service->localport,
1248						service->remoteport);
1249
1250					/* Make it look like a client, because
1251					   it must be removed and not left in
1252					   the LISTENING state. */
1253					service->public_fourcc =
1254						VCHIQ_FOURCC_INVALID;
1255
1256					if (vchiq_close_service_internal(
1257						service, 0/*!close_recvd*/) !=
1258						VCHIQ_SUCCESS)
1259						request_poll(state, service,
1260							VCHIQ_POLL_REMOVE);
1261				} else if (service_flags &
1262					(1 << VCHIQ_POLL_TERMINATE)) {
1263					vchiq_log_info(vchiq_core_log_level,
1264						"%d: ps - terminate %d<->%d",
1265						state->id, service->localport,
1266						service->remoteport);
1267					if (vchiq_close_service_internal(
1268						service, 0/*!close_recvd*/) !=
1269						VCHIQ_SUCCESS)
1270						request_poll(state, service,
1271							VCHIQ_POLL_TERMINATE);
1272				}
1273				if (service_flags & (1 << VCHIQ_POLL_TXNOTIFY))
1274					notify_bulks(service,
1275						&service->bulk_tx,
1276						1/*retry_poll*/);
1277				if (service_flags & (1 << VCHIQ_POLL_RXNOTIFY))
1278					notify_bulks(service,
1279						&service->bulk_rx,
1280						1/*retry_poll*/);
1281				unlock_service(service);
1282			}
1283		}
1284	}
1285}
1286
1287/* Called by the slot handler or application threads, holding the bulk mutex. */
1288static int
1289resolve_bulks(VCHIQ_SERVICE_T *service, VCHIQ_BULK_QUEUE_T *queue)
1290{
1291	VCHIQ_STATE_T *state = service->state;
1292	int resolved = 0;
1293	int rc;
1294
1295	while ((queue->process != queue->local_insert) &&
1296		(queue->process != queue->remote_insert)) {
1297		VCHIQ_BULK_T *bulk = &queue->bulks[BULK_INDEX(queue->process)];
1298
1299		vchiq_log_trace(vchiq_core_log_level,
1300			"%d: rb:%d %cx - li=%x ri=%x p=%x",
1301			state->id, service->localport,
1302			(queue == &service->bulk_tx) ? 't' : 'r',
1303			queue->local_insert, queue->remote_insert,
1304			queue->process);
1305
1306		WARN_ON(!((int)(queue->local_insert - queue->process) > 0));
1307		WARN_ON(!((int)(queue->remote_insert - queue->process) > 0));
1308
1309		rc = lmutex_lock_interruptible(&state->bulk_transfer_mutex);
1310		if (rc != 0)
1311			break;
1312
1313		vchiq_transfer_bulk(bulk);
1314		lmutex_unlock(&state->bulk_transfer_mutex);
1315
1316		if (vchiq_core_msg_log_level >= VCHIQ_LOG_INFO) {
1317			const char *header = (queue == &service->bulk_tx) ?
1318				"Send Bulk to" : "Recv Bulk from";
1319			if (bulk->actual != VCHIQ_BULK_ACTUAL_ABORTED)
1320				vchiq_log_info(vchiq_core_msg_log_level,
1321					"%s %c%c%c%c d:%d len:%d %x<->%x",
1322					header,
1323					VCHIQ_FOURCC_AS_4CHARS(
1324						service->base.fourcc),
1325					service->remoteport,
1326					bulk->size,
1327					(unsigned int)bulk->data,
1328					(unsigned int)bulk->remote_data);
1329			else
1330				vchiq_log_info(vchiq_core_msg_log_level,
1331					"%s %c%c%c%c d:%d ABORTED - tx len:%d,"
1332					" rx len:%d %x<->%x",
1333					header,
1334					VCHIQ_FOURCC_AS_4CHARS(
1335						service->base.fourcc),
1336					service->remoteport,
1337					bulk->size,
1338					bulk->remote_size,
1339					(unsigned int)bulk->data,
1340					(unsigned int)bulk->remote_data);
1341		}
1342
1343		vchiq_complete_bulk(bulk);
1344		queue->process++;
1345		resolved++;
1346	}
1347	return resolved;
1348}
1349
1350/* Called with the bulk_mutex held */
1351static void
1352abort_outstanding_bulks(VCHIQ_SERVICE_T *service, VCHIQ_BULK_QUEUE_T *queue)
1353{
1354	int is_tx = (queue == &service->bulk_tx);
1355	vchiq_log_trace(vchiq_core_log_level,
1356		"%d: aob:%d %cx - li=%x ri=%x p=%x",
1357		service->state->id, service->localport, is_tx ? 't' : 'r',
1358		queue->local_insert, queue->remote_insert, queue->process);
1359
1360	WARN_ON(!((int)(queue->local_insert - queue->process) >= 0));
1361	WARN_ON(!((int)(queue->remote_insert - queue->process) >= 0));
1362
1363	while ((queue->process != queue->local_insert) ||
1364		(queue->process != queue->remote_insert)) {
1365		VCHIQ_BULK_T *bulk = &queue->bulks[BULK_INDEX(queue->process)];
1366
1367		if (queue->process == queue->remote_insert) {
1368			/* fabricate a matching dummy bulk */
1369			bulk->remote_data = NULL;
1370			bulk->remote_size = 0;
1371			queue->remote_insert++;
1372		}
1373
1374		if (queue->process != queue->local_insert) {
1375			vchiq_complete_bulk(bulk);
1376
1377			vchiq_log_info(vchiq_core_msg_log_level,
1378				"%s %c%c%c%c d:%d ABORTED - tx len:%d, "
1379				"rx len:%d",
1380				is_tx ? "Send Bulk to" : "Recv Bulk from",
1381				VCHIQ_FOURCC_AS_4CHARS(service->base.fourcc),
1382				service->remoteport,
1383				bulk->size,
1384				bulk->remote_size);
1385		} else {
1386			/* fabricate a matching dummy bulk */
1387			bulk->data = NULL;
1388			bulk->size = 0;
1389			bulk->actual = VCHIQ_BULK_ACTUAL_ABORTED;
1390			bulk->dir = is_tx ? VCHIQ_BULK_TRANSMIT :
1391				VCHIQ_BULK_RECEIVE;
1392			queue->local_insert++;
1393		}
1394
1395		queue->process++;
1396	}
1397}
1398
1399/* Called from the slot handler thread */
1400static void
1401pause_bulks(VCHIQ_STATE_T *state)
1402{
1403	if (unlikely(atomic_inc_return(&pause_bulks_count) != 1)) {
1404		WARN_ON_ONCE(1);
1405		atomic_set(&pause_bulks_count, 1);
1406		return;
1407	}
1408
1409	/* Block bulk transfers from all services */
1410	lmutex_lock(&state->bulk_transfer_mutex);
1411}
1412
1413/* Called from the slot handler thread */
1414static void
1415resume_bulks(VCHIQ_STATE_T *state)
1416{
1417	int i;
1418	if (unlikely(atomic_dec_return(&pause_bulks_count) != 0)) {
1419		WARN_ON_ONCE(1);
1420		atomic_set(&pause_bulks_count, 0);
1421		return;
1422	}
1423
1424	/* Allow bulk transfers from all services */
1425	lmutex_unlock(&state->bulk_transfer_mutex);
1426
1427	if (state->deferred_bulks == 0)
1428		return;
1429
1430	/* Deal with any bulks which had to be deferred due to being in
1431	 * paused state.  Don't try to match up to number of deferred bulks
1432	 * in case we've had something come and close the service in the
1433	 * interim - just process all bulk queues for all services */
1434	vchiq_log_info(vchiq_core_log_level, "%s: processing %d deferred bulks",
1435		__func__, state->deferred_bulks);
1436
1437	for (i = 0; i < state->unused_service; i++) {
1438		VCHIQ_SERVICE_T *service = state->services[i];
1439		int resolved_rx = 0;
1440		int resolved_tx = 0;
1441		if (!service || (service->srvstate != VCHIQ_SRVSTATE_OPEN))
1442			continue;
1443
1444		lmutex_lock(&service->bulk_mutex);
1445		resolved_rx = resolve_bulks(service, &service->bulk_rx);
1446		resolved_tx = resolve_bulks(service, &service->bulk_tx);
1447		lmutex_unlock(&service->bulk_mutex);
1448		if (resolved_rx)
1449			notify_bulks(service, &service->bulk_rx, 1);
1450		if (resolved_tx)
1451			notify_bulks(service, &service->bulk_tx, 1);
1452	}
1453	state->deferred_bulks = 0;
1454}
1455
1456static int
1457parse_open(VCHIQ_STATE_T *state, VCHIQ_HEADER_T *header)
1458{
1459	VCHIQ_SERVICE_T *service = NULL;
1460	int msgid, size;
1461	int type;
1462	unsigned int localport, remoteport;
1463
1464	msgid = header->msgid;
1465	size = header->size;
1466	type = VCHIQ_MSG_TYPE(msgid);
1467	localport = VCHIQ_MSG_DSTPORT(msgid);
1468	remoteport = VCHIQ_MSG_SRCPORT(msgid);
1469	if (size >= sizeof(struct vchiq_open_payload)) {
1470		const struct vchiq_open_payload *payload =
1471			(struct vchiq_open_payload *)header->data;
1472		unsigned int fourcc;
1473
1474		fourcc = payload->fourcc;
1475		vchiq_log_info(vchiq_core_log_level,
1476			"%d: prs OPEN@%x (%d->'%c%c%c%c')",
1477			state->id, (unsigned int)header,
1478			localport,
1479			VCHIQ_FOURCC_AS_4CHARS(fourcc));
1480
1481		service = get_listening_service(state, fourcc);
1482
1483		if (service) {
1484			/* A matching service exists */
1485			short v = payload->version;
1486			short version_min = payload->version_min;
1487			if ((service->version < version_min) ||
1488				(v < service->version_min)) {
1489				/* Version mismatch */
1490				vchiq_loud_error_header();
1491				vchiq_loud_error("%d: service %d (%c%c%c%c) "
1492					"version mismatch - local (%d, min %d)"
1493					" vs. remote (%d, min %d)",
1494					state->id, service->localport,
1495					VCHIQ_FOURCC_AS_4CHARS(fourcc),
1496					service->version, service->version_min,
1497					v, version_min);
1498				vchiq_loud_error_footer();
1499				unlock_service(service);
1500				goto fail_open;
1501			}
1502			service->peer_version = v;
1503
1504			if (service->srvstate == VCHIQ_SRVSTATE_LISTENING) {
1505				struct vchiq_openack_payload ack_payload = {
1506					service->version
1507				};
1508				VCHIQ_ELEMENT_T body = {
1509					&ack_payload,
1510					sizeof(ack_payload)
1511				};
1512
1513				/* Acknowledge the OPEN */
1514				if (service->sync) {
1515					if (queue_message_sync(state, NULL,
1516						VCHIQ_MAKE_MSG(
1517							VCHIQ_MSG_OPENACK,
1518							service->localport,
1519							remoteport),
1520						&body, 1, sizeof(ack_payload),
1521						0) == VCHIQ_RETRY)
1522						goto bail_not_ready;
1523				} else {
1524					if (queue_message(state, NULL,
1525						VCHIQ_MAKE_MSG(
1526							VCHIQ_MSG_OPENACK,
1527							service->localport,
1528							remoteport),
1529						&body, 1, sizeof(ack_payload),
1530						0) == VCHIQ_RETRY)
1531						goto bail_not_ready;
1532				}
1533
1534				/* The service is now open */
1535				vchiq_set_service_state(service,
1536					service->sync ? VCHIQ_SRVSTATE_OPENSYNC
1537					: VCHIQ_SRVSTATE_OPEN);
1538			}
1539
1540			service->remoteport = remoteport;
1541			service->client_id = ((int *)header->data)[1];
1542			if (make_service_callback(service, VCHIQ_SERVICE_OPENED,
1543				NULL, NULL) == VCHIQ_RETRY) {
1544				/* Bail out if not ready */
1545				service->remoteport = VCHIQ_PORT_FREE;
1546				goto bail_not_ready;
1547			}
1548
1549			/* Success - the message has been dealt with */
1550			unlock_service(service);
1551			return 1;
1552		}
1553	}
1554
1555fail_open:
1556	/* No available service, or an invalid request - send a CLOSE */
1557	if (queue_message(state, NULL,
1558		VCHIQ_MAKE_MSG(VCHIQ_MSG_CLOSE, 0, VCHIQ_MSG_SRCPORT(msgid)),
1559		NULL, 0, 0, 0) == VCHIQ_RETRY)
1560		goto bail_not_ready;
1561
1562	return 1;
1563
1564bail_not_ready:
1565	unlock_service(service);
1566
1567	return 0;
1568}
1569
1570/* Called by the slot handler thread */
1571static void
1572parse_rx_slots(VCHIQ_STATE_T *state)
1573{
1574	VCHIQ_SHARED_STATE_T *remote = state->remote;
1575	VCHIQ_SERVICE_T *service = NULL;
1576	int tx_pos;
1577	DEBUG_INITIALISE(state->local)
1578
1579	tx_pos = remote->tx_pos;
1580
1581	while (state->rx_pos != tx_pos) {
1582		VCHIQ_HEADER_T *header;
1583		int msgid, size;
1584		int type;
1585		unsigned int localport, remoteport;
1586
1587		DEBUG_TRACE(PARSE_LINE);
1588		if (!state->rx_data) {
1589			int rx_index;
1590			WARN_ON(!((state->rx_pos & VCHIQ_SLOT_MASK) == 0));
1591			rx_index = remote->slot_queue[
1592				SLOT_QUEUE_INDEX_FROM_POS(state->rx_pos) &
1593				VCHIQ_SLOT_QUEUE_MASK];
1594			state->rx_data = (char *)SLOT_DATA_FROM_INDEX(state,
1595				rx_index);
1596			state->rx_info = SLOT_INFO_FROM_INDEX(state, rx_index);
1597
1598			/* Initialise use_count to one, and increment
1599			** release_count at the end of the slot to avoid
1600			** releasing the slot prematurely. */
1601			state->rx_info->use_count = 1;
1602			state->rx_info->release_count = 0;
1603		}
1604
1605		header = (VCHIQ_HEADER_T *)(state->rx_data +
1606			(state->rx_pos & VCHIQ_SLOT_MASK));
1607		DEBUG_VALUE(PARSE_HEADER, (int)header);
1608		msgid = header->msgid;
1609		DEBUG_VALUE(PARSE_MSGID, msgid);
1610		size = header->size;
1611		type = VCHIQ_MSG_TYPE(msgid);
1612		localport = VCHIQ_MSG_DSTPORT(msgid);
1613		remoteport = VCHIQ_MSG_SRCPORT(msgid);
1614
1615		if (type != VCHIQ_MSG_DATA)
1616			VCHIQ_STATS_INC(state, ctrl_rx_count);
1617
1618		switch (type) {
1619		case VCHIQ_MSG_OPENACK:
1620		case VCHIQ_MSG_CLOSE:
1621		case VCHIQ_MSG_DATA:
1622		case VCHIQ_MSG_BULK_RX:
1623		case VCHIQ_MSG_BULK_TX:
1624		case VCHIQ_MSG_BULK_RX_DONE:
1625		case VCHIQ_MSG_BULK_TX_DONE:
1626			service = find_service_by_port(state, localport);
1627			if ((!service || service->remoteport != remoteport) &&
1628				(localport == 0) &&
1629				(type == VCHIQ_MSG_CLOSE)) {
1630				/* This could be a CLOSE from a client which
1631				   hadn't yet received the OPENACK - look for
1632				   the connected service */
1633				if (service)
1634					unlock_service(service);
1635				service = get_connected_service(state,
1636					remoteport);
1637				if (service)
1638					vchiq_log_warning(vchiq_core_log_level,
1639						"%d: prs %s@%x (%d->%d) - "
1640						"found connected service %d",
1641						state->id, msg_type_str(type),
1642						(unsigned int)header,
1643						remoteport, localport,
1644						service->localport);
1645			}
1646
1647			if (!service) {
1648				vchiq_log_error(vchiq_core_log_level,
1649					"%d: prs %s@%x (%d->%d) - "
1650					"invalid/closed service %d",
1651					state->id, msg_type_str(type),
1652					(unsigned int)header,
1653					remoteport, localport, localport);
1654				goto skip_message;
1655			}
1656			break;
1657		default:
1658			break;
1659		}
1660
1661		if (vchiq_core_msg_log_level >= VCHIQ_LOG_INFO) {
1662			int svc_fourcc;
1663
1664			svc_fourcc = service
1665				? service->base.fourcc
1666				: VCHIQ_MAKE_FOURCC('?', '?', '?', '?');
1667			vchiq_log_info(vchiq_core_msg_log_level,
1668				"Rcvd Msg %s(%u) from %c%c%c%c s:%d d:%d "
1669				"len:%d",
1670				msg_type_str(type), type,
1671				VCHIQ_FOURCC_AS_4CHARS(svc_fourcc),
1672				remoteport, localport, size);
1673			if (size > 0)
1674				vchiq_log_dump_mem("Rcvd", 0, header->data,
1675					min(64, size));
1676		}
1677
1678		if (((unsigned int)header & VCHIQ_SLOT_MASK) + calc_stride(size)
1679			> VCHIQ_SLOT_SIZE) {
1680			vchiq_log_error(vchiq_core_log_level,
1681				"header %x (msgid %x) - size %x too big for "
1682				"slot",
1683				(unsigned int)header, (unsigned int)msgid,
1684				(unsigned int)size);
1685			WARN(1, "oversized for slot\n");
1686		}
1687
1688		switch (type) {
1689		case VCHIQ_MSG_OPEN:
1690			WARN_ON(!(VCHIQ_MSG_DSTPORT(msgid) == 0));
1691			if (!parse_open(state, header))
1692				goto bail_not_ready;
1693			break;
1694		case VCHIQ_MSG_OPENACK:
1695			if (size >= sizeof(struct vchiq_openack_payload)) {
1696				const struct vchiq_openack_payload *payload =
1697					(struct vchiq_openack_payload *)
1698					header->data;
1699				service->peer_version = payload->version;
1700			}
1701			vchiq_log_info(vchiq_core_log_level,
1702				"%d: prs OPENACK@%x,%x (%d->%d) v:%d",
1703				state->id, (unsigned int)header, size,
1704				remoteport, localport, service->peer_version);
1705			if (service->srvstate ==
1706				VCHIQ_SRVSTATE_OPENING) {
1707				service->remoteport = remoteport;
1708				vchiq_set_service_state(service,
1709					VCHIQ_SRVSTATE_OPEN);
1710				up(&service->remove_event);
1711			} else
1712				vchiq_log_error(vchiq_core_log_level,
1713					"OPENACK received in state %s",
1714					srvstate_names[service->srvstate]);
1715			break;
1716		case VCHIQ_MSG_CLOSE:
1717			WARN_ON(size != 0); /* There should be no data */
1718
1719			vchiq_log_info(vchiq_core_log_level,
1720				"%d: prs CLOSE@%x (%d->%d)",
1721				state->id, (unsigned int)header,
1722				remoteport, localport);
1723
1724			mark_service_closing_internal(service, 1);
1725
1726			if (vchiq_close_service_internal(service,
1727				1/*close_recvd*/) == VCHIQ_RETRY)
1728				goto bail_not_ready;
1729
1730			vchiq_log_info(vchiq_core_log_level,
1731				"Close Service %c%c%c%c s:%u d:%d",
1732				VCHIQ_FOURCC_AS_4CHARS(service->base.fourcc),
1733				service->localport,
1734				service->remoteport);
1735			break;
1736		case VCHIQ_MSG_DATA:
1737			vchiq_log_trace(vchiq_core_log_level,
1738				"%d: prs DATA@%x,%x (%d->%d)",
1739				state->id, (unsigned int)header, size,
1740				remoteport, localport);
1741
1742			if ((service->remoteport == remoteport)
1743				&& (service->srvstate ==
1744				VCHIQ_SRVSTATE_OPEN)) {
1745				header->msgid = msgid | VCHIQ_MSGID_CLAIMED;
1746				claim_slot(state->rx_info);
1747				DEBUG_TRACE(PARSE_LINE);
1748				if (make_service_callback(service,
1749					VCHIQ_MESSAGE_AVAILABLE, header,
1750					NULL) == VCHIQ_RETRY) {
1751					DEBUG_TRACE(PARSE_LINE);
1752					goto bail_not_ready;
1753				}
1754				VCHIQ_SERVICE_STATS_INC(service, ctrl_rx_count);
1755				VCHIQ_SERVICE_STATS_ADD(service, ctrl_rx_bytes,
1756					size);
1757			} else {
1758				VCHIQ_STATS_INC(state, error_count);
1759			}
1760			break;
1761		case VCHIQ_MSG_CONNECT:
1762			vchiq_log_info(vchiq_core_log_level,
1763				"%d: prs CONNECT@%x",
1764				state->id, (unsigned int)header);
1765			up(&state->connect);
1766			break;
1767		case VCHIQ_MSG_BULK_RX:
1768		case VCHIQ_MSG_BULK_TX: {
1769			VCHIQ_BULK_QUEUE_T *queue;
1770			WARN_ON(!state->is_master);
1771			queue = (type == VCHIQ_MSG_BULK_RX) ?
1772				&service->bulk_tx : &service->bulk_rx;
1773			if ((service->remoteport == remoteport)
1774				&& (service->srvstate ==
1775				VCHIQ_SRVSTATE_OPEN)) {
1776				VCHIQ_BULK_T *bulk;
1777				int resolved = 0;
1778
1779				DEBUG_TRACE(PARSE_LINE);
1780				if (lmutex_lock_interruptible(
1781					&service->bulk_mutex) != 0) {
1782					DEBUG_TRACE(PARSE_LINE);
1783					goto bail_not_ready;
1784				}
1785
1786				WARN_ON(!(queue->remote_insert < queue->remove +
1787					VCHIQ_NUM_SERVICE_BULKS));
1788				bulk = &queue->bulks[
1789					BULK_INDEX(queue->remote_insert)];
1790				bulk->remote_data =
1791					(void *)((int *)header->data)[0];
1792				bulk->remote_size = ((int *)header->data)[1];
1793				wmb();
1794
1795				vchiq_log_info(vchiq_core_log_level,
1796					"%d: prs %s@%x (%d->%d) %x@%x",
1797					state->id, msg_type_str(type),
1798					(unsigned int)header,
1799					remoteport, localport,
1800					bulk->remote_size,
1801					(unsigned int)bulk->remote_data);
1802
1803				queue->remote_insert++;
1804
1805				if (atomic_read(&pause_bulks_count)) {
1806					state->deferred_bulks++;
1807					vchiq_log_info(vchiq_core_log_level,
1808						"%s: deferring bulk (%d)",
1809						__func__,
1810						state->deferred_bulks);
1811					if (state->conn_state !=
1812						VCHIQ_CONNSTATE_PAUSE_SENT)
1813						vchiq_log_error(
1814							vchiq_core_log_level,
1815							"%s: bulks paused in "
1816							"unexpected state %s",
1817							__func__,
1818							conn_state_names[
1819							state->conn_state]);
1820				} else if (state->conn_state ==
1821					VCHIQ_CONNSTATE_CONNECTED) {
1822					DEBUG_TRACE(PARSE_LINE);
1823					resolved = resolve_bulks(service,
1824						queue);
1825				}
1826
1827				lmutex_unlock(&service->bulk_mutex);
1828				if (resolved)
1829					notify_bulks(service, queue,
1830						1/*retry_poll*/);
1831			}
1832		} break;
1833		case VCHIQ_MSG_BULK_RX_DONE:
1834		case VCHIQ_MSG_BULK_TX_DONE:
1835			WARN_ON(state->is_master);
1836			if ((service->remoteport == remoteport)
1837				&& (service->srvstate !=
1838				VCHIQ_SRVSTATE_FREE)) {
1839				VCHIQ_BULK_QUEUE_T *queue;
1840				VCHIQ_BULK_T *bulk;
1841
1842				queue = (type == VCHIQ_MSG_BULK_RX_DONE) ?
1843					&service->bulk_rx : &service->bulk_tx;
1844
1845				DEBUG_TRACE(PARSE_LINE);
1846				if (lmutex_lock_interruptible(
1847					&service->bulk_mutex) != 0) {
1848					DEBUG_TRACE(PARSE_LINE);
1849					goto bail_not_ready;
1850				}
1851				if ((int)(queue->remote_insert -
1852					queue->local_insert) >= 0) {
1853					vchiq_log_error(vchiq_core_log_level,
1854						"%d: prs %s@%x (%d->%d) "
1855						"unexpected (ri=%d,li=%d)",
1856						state->id, msg_type_str(type),
1857						(unsigned int)header,
1858						remoteport, localport,
1859						queue->remote_insert,
1860						queue->local_insert);
1861					lmutex_unlock(&service->bulk_mutex);
1862					break;
1863				}
1864
1865				BUG_ON(queue->process == queue->local_insert);
1866				BUG_ON(queue->process != queue->remote_insert);
1867
1868				bulk = &queue->bulks[
1869					BULK_INDEX(queue->remote_insert)];
1870				bulk->actual = *(int *)header->data;
1871				queue->remote_insert++;
1872
1873				vchiq_log_info(vchiq_core_log_level,
1874					"%d: prs %s@%x (%d->%d) %x@%x",
1875					state->id, msg_type_str(type),
1876					(unsigned int)header,
1877					remoteport, localport,
1878					bulk->actual, (unsigned int)bulk->data);
1879
1880				vchiq_log_trace(vchiq_core_log_level,
1881					"%d: prs:%d %cx li=%x ri=%x p=%x",
1882					state->id, localport,
1883					(type == VCHIQ_MSG_BULK_RX_DONE) ?
1884						'r' : 't',
1885					queue->local_insert,
1886					queue->remote_insert, queue->process);
1887
1888				DEBUG_TRACE(PARSE_LINE);
1889				WARN_ON(queue->process == queue->local_insert);
1890				vchiq_complete_bulk(bulk);
1891				queue->process++;
1892				lmutex_unlock(&service->bulk_mutex);
1893				DEBUG_TRACE(PARSE_LINE);
1894				notify_bulks(service, queue, 1/*retry_poll*/);
1895				DEBUG_TRACE(PARSE_LINE);
1896			}
1897			break;
1898		case VCHIQ_MSG_PADDING:
1899			vchiq_log_trace(vchiq_core_log_level,
1900				"%d: prs PADDING@%x,%x",
1901				state->id, (unsigned int)header, size);
1902			break;
1903		case VCHIQ_MSG_PAUSE:
1904			/* If initiated, signal the application thread */
1905			vchiq_log_trace(vchiq_core_log_level,
1906				"%d: prs PAUSE@%x,%x",
1907				state->id, (unsigned int)header, size);
1908			if (state->conn_state == VCHIQ_CONNSTATE_PAUSED) {
1909				vchiq_log_error(vchiq_core_log_level,
1910					"%d: PAUSE received in state PAUSED",
1911					state->id);
1912				break;
1913			}
1914			if (state->conn_state != VCHIQ_CONNSTATE_PAUSE_SENT) {
1915				/* Send a PAUSE in response */
1916				if (queue_message(state, NULL,
1917					VCHIQ_MAKE_MSG(VCHIQ_MSG_PAUSE, 0, 0),
1918					NULL, 0, 0, 0) == VCHIQ_RETRY)
1919					goto bail_not_ready;
1920				if (state->is_master)
1921					pause_bulks(state);
1922			}
1923			/* At this point slot_mutex is held */
1924			vchiq_set_conn_state(state, VCHIQ_CONNSTATE_PAUSED);
1925			vchiq_platform_paused(state);
1926			break;
1927		case VCHIQ_MSG_RESUME:
1928			vchiq_log_trace(vchiq_core_log_level,
1929				"%d: prs RESUME@%x,%x",
1930				state->id, (unsigned int)header, size);
1931			/* Release the slot mutex */
1932			lmutex_unlock(&state->slot_mutex);
1933			if (state->is_master)
1934				resume_bulks(state);
1935			vchiq_set_conn_state(state, VCHIQ_CONNSTATE_CONNECTED);
1936			vchiq_platform_resumed(state);
1937			break;
1938
1939		case VCHIQ_MSG_REMOTE_USE:
1940			vchiq_on_remote_use(state);
1941			break;
1942		case VCHIQ_MSG_REMOTE_RELEASE:
1943			vchiq_on_remote_release(state);
1944			break;
1945		case VCHIQ_MSG_REMOTE_USE_ACTIVE:
1946			vchiq_on_remote_use_active(state);
1947			break;
1948
1949		default:
1950			vchiq_log_error(vchiq_core_log_level,
1951				"%d: prs invalid msgid %x@%x,%x",
1952				state->id, msgid, (unsigned int)header, size);
1953			WARN(1, "invalid message\n");
1954			break;
1955		}
1956
1957skip_message:
1958		if (service) {
1959			unlock_service(service);
1960			service = NULL;
1961		}
1962
1963		state->rx_pos += calc_stride(size);
1964
1965		DEBUG_TRACE(PARSE_LINE);
1966		/* Perform some housekeeping when the end of the slot is
1967		** reached. */
1968		if ((state->rx_pos & VCHIQ_SLOT_MASK) == 0) {
1969			/* Remove the extra reference count. */
1970			release_slot(state, state->rx_info, NULL, NULL);
1971			state->rx_data = NULL;
1972		}
1973	}
1974
1975bail_not_ready:
1976	if (service)
1977		unlock_service(service);
1978}
1979
1980/* Called by the slot handler thread */
1981int slot_handler_func(void *v);
1982int
1983slot_handler_func(void *v)
1984{
1985	VCHIQ_STATE_T *state = (VCHIQ_STATE_T *) v;
1986	VCHIQ_SHARED_STATE_T *local = state->local;
1987	DEBUG_INITIALISE(local)
1988
1989	while (1) {
1990		DEBUG_COUNT(SLOT_HANDLER_COUNT);
1991		DEBUG_TRACE(SLOT_HANDLER_LINE);
1992		remote_event_wait(&local->trigger);
1993
1994		rmb();
1995
1996		DEBUG_TRACE(SLOT_HANDLER_LINE);
1997		if (state->poll_needed) {
1998			/* Check if we need to suspend - may change our
1999			 * conn_state */
2000			vchiq_platform_check_suspend(state);
2001
2002			state->poll_needed = 0;
2003
2004			/* Handle service polling and other rare conditions here
2005			** out of the mainline code */
2006			switch (state->conn_state) {
2007			case VCHIQ_CONNSTATE_CONNECTED:
2008				/* Poll the services as requested */
2009				poll_services(state);
2010				break;
2011
2012			case VCHIQ_CONNSTATE_PAUSING:
2013				if (state->is_master)
2014					pause_bulks(state);
2015				if (queue_message(state, NULL,
2016					VCHIQ_MAKE_MSG(VCHIQ_MSG_PAUSE, 0, 0),
2017					NULL, 0, 0, 0) != VCHIQ_RETRY) {
2018					vchiq_set_conn_state(state,
2019						VCHIQ_CONNSTATE_PAUSE_SENT);
2020				} else {
2021					if (state->is_master)
2022						resume_bulks(state);
2023					/* Retry later */
2024					state->poll_needed = 1;
2025				}
2026				break;
2027
2028			case VCHIQ_CONNSTATE_PAUSED:
2029				vchiq_platform_resume(state);
2030				break;
2031
2032			case VCHIQ_CONNSTATE_RESUMING:
2033				if (queue_message(state, NULL,
2034					VCHIQ_MAKE_MSG(VCHIQ_MSG_RESUME, 0, 0),
2035					NULL, 0, 0, 0) != VCHIQ_RETRY) {
2036					if (state->is_master)
2037						resume_bulks(state);
2038					vchiq_set_conn_state(state,
2039						VCHIQ_CONNSTATE_CONNECTED);
2040					vchiq_platform_resumed(state);
2041				} else {
2042					/* This should really be impossible,
2043					** since the PAUSE should have flushed
2044					** through outstanding messages. */
2045					vchiq_log_error(vchiq_core_log_level,
2046						"Failed to send RESUME "
2047						"message");
2048					BUG();
2049				}
2050				break;
2051
2052			case VCHIQ_CONNSTATE_PAUSE_TIMEOUT:
2053			case VCHIQ_CONNSTATE_RESUME_TIMEOUT:
2054				vchiq_platform_handle_timeout(state);
2055				break;
2056			default:
2057				break;
2058			}
2059
2060
2061		}
2062
2063		DEBUG_TRACE(SLOT_HANDLER_LINE);
2064		parse_rx_slots(state);
2065	}
2066	return 0;
2067}
2068
2069
2070/* Called by the recycle thread */
2071int recycle_func(void *v);
2072int
2073recycle_func(void *v)
2074{
2075	VCHIQ_STATE_T *state = (VCHIQ_STATE_T *) v;
2076	VCHIQ_SHARED_STATE_T *local = state->local;
2077
2078	while (1) {
2079		remote_event_wait(&local->recycle);
2080
2081		process_free_queue(state);
2082	}
2083	return 0;
2084}
2085
2086
2087/* Called by the sync thread */
2088int sync_func(void *v);
2089int
2090sync_func(void *v)
2091{
2092	VCHIQ_STATE_T *state = (VCHIQ_STATE_T *) v;
2093	VCHIQ_SHARED_STATE_T *local = state->local;
2094	VCHIQ_HEADER_T *header = (VCHIQ_HEADER_T *)SLOT_DATA_FROM_INDEX(state,
2095		state->remote->slot_sync);
2096
2097	while (1) {
2098		VCHIQ_SERVICE_T *service;
2099		int msgid, size;
2100		int type;
2101		unsigned int localport, remoteport;
2102
2103		remote_event_wait(&local->sync_trigger);
2104
2105		rmb();
2106
2107		msgid = header->msgid;
2108		size = header->size;
2109		type = VCHIQ_MSG_TYPE(msgid);
2110		localport = VCHIQ_MSG_DSTPORT(msgid);
2111		remoteport = VCHIQ_MSG_SRCPORT(msgid);
2112
2113		service = find_service_by_port(state, localport);
2114
2115		if (!service) {
2116			vchiq_log_error(vchiq_sync_log_level,
2117				"%d: sf %s@%x (%d->%d) - "
2118				"invalid/closed service %d",
2119				state->id, msg_type_str(type),
2120				(unsigned int)header,
2121				remoteport, localport, localport);
2122			release_message_sync(state, header);
2123			continue;
2124		}
2125
2126		if (vchiq_sync_log_level >= VCHIQ_LOG_TRACE) {
2127			int svc_fourcc;
2128
2129			svc_fourcc = service
2130				? service->base.fourcc
2131				: VCHIQ_MAKE_FOURCC('?', '?', '?', '?');
2132			vchiq_log_trace(vchiq_sync_log_level,
2133				"Rcvd Msg %s from %c%c%c%c s:%d d:%d len:%d",
2134				msg_type_str(type),
2135				VCHIQ_FOURCC_AS_4CHARS(svc_fourcc),
2136				remoteport, localport, size);
2137			if (size > 0)
2138				vchiq_log_dump_mem("Rcvd", 0, header->data,
2139					min(64, size));
2140		}
2141
2142		switch (type) {
2143		case VCHIQ_MSG_OPENACK:
2144			if (size >= sizeof(struct vchiq_openack_payload)) {
2145				const struct vchiq_openack_payload *payload =
2146					(struct vchiq_openack_payload *)
2147					header->data;
2148				service->peer_version = payload->version;
2149			}
2150			vchiq_log_info(vchiq_sync_log_level,
2151				"%d: sf OPENACK@%x,%x (%d->%d) v:%d",
2152				state->id, (unsigned int)header, size,
2153				remoteport, localport, service->peer_version);
2154			if (service->srvstate == VCHIQ_SRVSTATE_OPENING) {
2155				service->remoteport = remoteport;
2156				vchiq_set_service_state(service,
2157					VCHIQ_SRVSTATE_OPENSYNC);
2158				up(&service->remove_event);
2159			}
2160			release_message_sync(state, header);
2161			break;
2162
2163		case VCHIQ_MSG_DATA:
2164			vchiq_log_trace(vchiq_sync_log_level,
2165				"%d: sf DATA@%x,%x (%d->%d)",
2166				state->id, (unsigned int)header, size,
2167				remoteport, localport);
2168
2169			if ((service->remoteport == remoteport) &&
2170				(service->srvstate ==
2171				VCHIQ_SRVSTATE_OPENSYNC)) {
2172				if (make_service_callback(service,
2173					VCHIQ_MESSAGE_AVAILABLE, header,
2174					NULL) == VCHIQ_RETRY)
2175					vchiq_log_error(vchiq_sync_log_level,
2176						"synchronous callback to "
2177						"service %d returns "
2178						"VCHIQ_RETRY",
2179						localport);
2180			}
2181			break;
2182
2183		default:
2184			vchiq_log_error(vchiq_sync_log_level,
2185				"%d: sf unexpected msgid %x@%x,%x",
2186				state->id, msgid, (unsigned int)header, size);
2187			release_message_sync(state, header);
2188			break;
2189		}
2190
2191		unlock_service(service);
2192	}
2193
2194	return 0;
2195}
2196
2197
2198static void
2199init_bulk_queue(VCHIQ_BULK_QUEUE_T *queue)
2200{
2201	queue->local_insert = 0;
2202	queue->remote_insert = 0;
2203	queue->process = 0;
2204	queue->remote_notify = 0;
2205	queue->remove = 0;
2206}
2207
2208
2209inline const char *
2210get_conn_state_name(VCHIQ_CONNSTATE_T conn_state)
2211{
2212	return conn_state_names[conn_state];
2213}
2214
2215
2216VCHIQ_SLOT_ZERO_T *
2217vchiq_init_slots(void *mem_base, int mem_size)
2218{
2219	int mem_align = (VCHIQ_SLOT_SIZE - (int)mem_base) & VCHIQ_SLOT_MASK;
2220	VCHIQ_SLOT_ZERO_T *slot_zero =
2221		(VCHIQ_SLOT_ZERO_T *)((char *)mem_base + mem_align);
2222	int num_slots = (mem_size - mem_align)/VCHIQ_SLOT_SIZE;
2223	int first_data_slot = VCHIQ_SLOT_ZERO_SLOTS;
2224
2225	/* Ensure there is enough memory to run an absolutely minimum system */
2226	num_slots -= first_data_slot;
2227
2228	if (num_slots < 4) {
2229		vchiq_log_error(vchiq_core_log_level,
2230			"vchiq_init_slots - insufficient memory %x bytes",
2231			mem_size);
2232		return NULL;
2233	}
2234
2235	memset(slot_zero, 0, sizeof(VCHIQ_SLOT_ZERO_T));
2236
2237	slot_zero->magic = VCHIQ_MAGIC;
2238	slot_zero->version = VCHIQ_VERSION;
2239	slot_zero->version_min = VCHIQ_VERSION_MIN;
2240	slot_zero->slot_zero_size = sizeof(VCHIQ_SLOT_ZERO_T);
2241	slot_zero->slot_size = VCHIQ_SLOT_SIZE;
2242	slot_zero->max_slots = VCHIQ_MAX_SLOTS;
2243	slot_zero->max_slots_per_side = VCHIQ_MAX_SLOTS_PER_SIDE;
2244
2245	slot_zero->master.slot_sync = first_data_slot;
2246	slot_zero->master.slot_first = first_data_slot + 1;
2247	slot_zero->master.slot_last = first_data_slot + (num_slots/2) - 1;
2248	slot_zero->slave.slot_sync = first_data_slot + (num_slots/2);
2249	slot_zero->slave.slot_first = first_data_slot + (num_slots/2) + 1;
2250	slot_zero->slave.slot_last = first_data_slot + num_slots - 1;
2251
2252	return slot_zero;
2253}
2254
2255VCHIQ_STATUS_T
2256vchiq_init_state(VCHIQ_STATE_T *state, VCHIQ_SLOT_ZERO_T *slot_zero,
2257		 int is_master)
2258{
2259	VCHIQ_SHARED_STATE_T *local;
2260	VCHIQ_SHARED_STATE_T *remote;
2261	VCHIQ_STATUS_T status;
2262	char threadname[10];
2263	static int id;
2264	int i;
2265
2266	/* Check the input configuration */
2267
2268	if (slot_zero->magic != VCHIQ_MAGIC) {
2269		vchiq_loud_error_header();
2270		vchiq_loud_error("Invalid VCHIQ magic value found.");
2271		vchiq_loud_error("slot_zero=%x: magic=%x (expected %x)",
2272			(unsigned int)slot_zero, slot_zero->magic, VCHIQ_MAGIC);
2273		vchiq_loud_error_footer();
2274		return VCHIQ_ERROR;
2275	}
2276
2277	vchiq_log_warning(vchiq_core_log_level,
2278		"%s: local ver %d (min %d), remote ver %d\n",
2279		__func__, VCHIQ_VERSION, VCHIQ_VERSION_MIN,
2280		slot_zero->version);
2281
2282	if (slot_zero->version < VCHIQ_VERSION_MIN) {
2283		vchiq_loud_error_header();
2284		vchiq_loud_error("Incompatible VCHIQ versions found.");
2285		vchiq_loud_error("slot_zero=%x: VideoCore version=%d "
2286			"(minimum %d)",
2287			(unsigned int)slot_zero, slot_zero->version,
2288			VCHIQ_VERSION_MIN);
2289		vchiq_loud_error("Restart with a newer VideoCore image.");
2290		vchiq_loud_error_footer();
2291		return VCHIQ_ERROR;
2292	}
2293
2294	if (VCHIQ_VERSION < slot_zero->version_min) {
2295		vchiq_loud_error_header();
2296		vchiq_loud_error("Incompatible VCHIQ versions found.");
2297		vchiq_loud_error("slot_zero=%x: version=%d (VideoCore "
2298			"minimum %d)",
2299			(unsigned int)slot_zero, VCHIQ_VERSION,
2300			slot_zero->version_min);
2301		vchiq_loud_error("Restart with a newer kernel.");
2302		vchiq_loud_error_footer();
2303		return VCHIQ_ERROR;
2304	}
2305
2306	if ((slot_zero->slot_zero_size != sizeof(VCHIQ_SLOT_ZERO_T)) ||
2307		 (slot_zero->slot_size != VCHIQ_SLOT_SIZE) ||
2308		 (slot_zero->max_slots != VCHIQ_MAX_SLOTS) ||
2309		 (slot_zero->max_slots_per_side != VCHIQ_MAX_SLOTS_PER_SIDE)) {
2310		vchiq_loud_error_header();
2311		if (slot_zero->slot_zero_size != sizeof(VCHIQ_SLOT_ZERO_T))
2312			vchiq_loud_error("slot_zero=%x: slot_zero_size=%x "
2313				"(expected %zx)",
2314				(unsigned int)slot_zero,
2315				slot_zero->slot_zero_size,
2316				sizeof(VCHIQ_SLOT_ZERO_T));
2317		if (slot_zero->slot_size != VCHIQ_SLOT_SIZE)
2318			vchiq_loud_error("slot_zero=%x: slot_size=%d "
2319				"(expected %d",
2320				(unsigned int)slot_zero, slot_zero->slot_size,
2321				VCHIQ_SLOT_SIZE);
2322		if (slot_zero->max_slots != VCHIQ_MAX_SLOTS)
2323			vchiq_loud_error("slot_zero=%x: max_slots=%d "
2324				"(expected %d)",
2325				(unsigned int)slot_zero, slot_zero->max_slots,
2326				VCHIQ_MAX_SLOTS);
2327		if (slot_zero->max_slots_per_side != VCHIQ_MAX_SLOTS_PER_SIDE)
2328			vchiq_loud_error("slot_zero=%x: max_slots_per_side=%d "
2329				"(expected %d)",
2330				(unsigned int)slot_zero,
2331				slot_zero->max_slots_per_side,
2332				VCHIQ_MAX_SLOTS_PER_SIDE);
2333		vchiq_loud_error_footer();
2334		return VCHIQ_ERROR;
2335	}
2336
2337	if (is_master) {
2338		local = &slot_zero->master;
2339		remote = &slot_zero->slave;
2340	} else {
2341		local = &slot_zero->slave;
2342		remote = &slot_zero->master;
2343	}
2344
2345	if (local->initialised) {
2346		vchiq_loud_error_header();
2347		if (remote->initialised)
2348			vchiq_loud_error("local state has already been "
2349				"initialised");
2350		else
2351			vchiq_loud_error("master/slave mismatch - two %ss",
2352				is_master ? "master" : "slave");
2353		vchiq_loud_error_footer();
2354		return VCHIQ_ERROR;
2355	}
2356
2357	memset(state, 0, sizeof(VCHIQ_STATE_T));
2358
2359	state->id = id++;
2360	state->is_master = is_master;
2361
2362	/*
2363		initialize shared state pointers
2364	 */
2365
2366	state->local = local;
2367	state->remote = remote;
2368	state->slot_data = (VCHIQ_SLOT_T *)slot_zero;
2369
2370	/*
2371		initialize events and mutexes
2372	 */
2373
2374	_sema_init(&state->connect, 0);
2375	lmutex_init(&state->mutex);
2376	_sema_init(&state->trigger_event, 0);
2377	_sema_init(&state->recycle_event, 0);
2378	_sema_init(&state->sync_trigger_event, 0);
2379	_sema_init(&state->sync_release_event, 0);
2380
2381	lmutex_init(&state->slot_mutex);
2382	lmutex_init(&state->recycle_mutex);
2383	lmutex_init(&state->sync_mutex);
2384	lmutex_init(&state->bulk_transfer_mutex);
2385
2386	_sema_init(&state->slot_available_event, 0);
2387	_sema_init(&state->slot_remove_event, 0);
2388	_sema_init(&state->data_quota_event, 0);
2389
2390	state->slot_queue_available = 0;
2391
2392	for (i = 0; i < VCHIQ_MAX_SERVICES; i++) {
2393		VCHIQ_SERVICE_QUOTA_T *service_quota =
2394			&state->service_quotas[i];
2395		_sema_init(&service_quota->quota_event, 0);
2396	}
2397
2398	for (i = local->slot_first; i <= local->slot_last; i++) {
2399		local->slot_queue[state->slot_queue_available++] = i;
2400		up(&state->slot_available_event);
2401	}
2402
2403	state->default_slot_quota = state->slot_queue_available/2;
2404	state->default_message_quota =
2405		min((unsigned short)(state->default_slot_quota * 256),
2406		(unsigned short)~0);
2407
2408	state->previous_data_index = -1;
2409	state->data_use_count = 0;
2410	state->data_quota = state->slot_queue_available - 1;
2411
2412	local->trigger.event = &state->trigger_event;
2413	remote_event_create(&local->trigger);
2414	local->tx_pos = 0;
2415
2416	local->recycle.event = &state->recycle_event;
2417	remote_event_create(&local->recycle);
2418	local->slot_queue_recycle = state->slot_queue_available;
2419
2420	local->sync_trigger.event = &state->sync_trigger_event;
2421	remote_event_create(&local->sync_trigger);
2422
2423	local->sync_release.event = &state->sync_release_event;
2424	remote_event_create(&local->sync_release);
2425
2426	/* At start-of-day, the slot is empty and available */
2427	((VCHIQ_HEADER_T *)SLOT_DATA_FROM_INDEX(state, local->slot_sync))->msgid
2428		= VCHIQ_MSGID_PADDING;
2429	remote_event_signal_local(&local->sync_release);
2430
2431	local->debug[DEBUG_ENTRIES] = DEBUG_MAX;
2432
2433	status = vchiq_platform_init_state(state);
2434
2435	/*
2436		bring up slot handler thread
2437	 */
2438	snprintf(threadname, sizeof(threadname), "VCHIQ-%d", state->id);
2439	state->slot_handler_thread = vchiq_thread_create(&slot_handler_func,
2440		(void *)state,
2441		threadname);
2442
2443	if (state->slot_handler_thread == NULL) {
2444		vchiq_loud_error_header();
2445		vchiq_loud_error("couldn't create thread %s", threadname);
2446		vchiq_loud_error_footer();
2447		return VCHIQ_ERROR;
2448	}
2449	set_user_nice(state->slot_handler_thread, -19);
2450	wake_up_process(state->slot_handler_thread);
2451
2452	snprintf(threadname, sizeof(threadname), "VCHIQr-%d", state->id);
2453	state->recycle_thread = vchiq_thread_create(&recycle_func,
2454		(void *)state,
2455		threadname);
2456	if (state->recycle_thread == NULL) {
2457		vchiq_loud_error_header();
2458		vchiq_loud_error("couldn't create thread %s", threadname);
2459		vchiq_loud_error_footer();
2460		return VCHIQ_ERROR;
2461	}
2462	set_user_nice(state->recycle_thread, -19);
2463	wake_up_process(state->recycle_thread);
2464
2465	snprintf(threadname, sizeof(threadname), "VCHIQs-%d", state->id);
2466	state->sync_thread = vchiq_thread_create(&sync_func,
2467		(void *)state,
2468		threadname);
2469	if (state->sync_thread == NULL) {
2470		vchiq_loud_error_header();
2471		vchiq_loud_error("couldn't create thread %s", threadname);
2472		vchiq_loud_error_footer();
2473		return VCHIQ_ERROR;
2474	}
2475	set_user_nice(state->sync_thread, -20);
2476	wake_up_process(state->sync_thread);
2477
2478	BUG_ON(state->id >= VCHIQ_MAX_STATES);
2479	vchiq_states[state->id] = state;
2480
2481	/* Indicate readiness to the other side */
2482	local->initialised = 1;
2483
2484	return status;
2485}
2486
2487/* Called from application thread when a client or server service is created. */
2488VCHIQ_SERVICE_T *
2489vchiq_add_service_internal(VCHIQ_STATE_T *state,
2490	const VCHIQ_SERVICE_PARAMS_T *params, int srvstate,
2491	VCHIQ_INSTANCE_T instance)
2492{
2493	VCHIQ_SERVICE_T *service;
2494
2495	service = kmalloc(sizeof(VCHIQ_SERVICE_T), GFP_KERNEL);
2496	if (service) {
2497		service->base.fourcc   = params->fourcc;
2498		service->base.callback = params->callback;
2499		service->base.userdata = params->userdata;
2500		service->handle        = VCHIQ_SERVICE_HANDLE_INVALID;
2501		service->ref_count     = 1;
2502		service->srvstate      = VCHIQ_SRVSTATE_FREE;
2503		service->localport     = VCHIQ_PORT_FREE;
2504		service->remoteport    = VCHIQ_PORT_FREE;
2505
2506		service->public_fourcc = (srvstate == VCHIQ_SRVSTATE_OPENING) ?
2507			VCHIQ_FOURCC_INVALID : params->fourcc;
2508		service->client_id     = 0;
2509		service->auto_close    = 1;
2510		service->sync          = 0;
2511		service->closing       = 0;
2512		atomic_set(&service->poll_flags, 0);
2513		service->version       = params->version;
2514		service->version_min   = params->version_min;
2515		service->state         = state;
2516		service->instance      = instance;
2517		service->service_use_count = 0;
2518		init_bulk_queue(&service->bulk_tx);
2519		init_bulk_queue(&service->bulk_rx);
2520		_sema_init(&service->remove_event, 0);
2521		_sema_init(&service->bulk_remove_event, 0);
2522		lmutex_init(&service->bulk_mutex);
2523		memset(&service->stats, 0, sizeof(service->stats));
2524	} else {
2525		vchiq_log_error(vchiq_core_log_level,
2526			"Out of memory");
2527	}
2528
2529	if (service) {
2530		VCHIQ_SERVICE_T **pservice = NULL;
2531		int i;
2532
2533		/* Although it is perfectly possible to use service_spinlock
2534		** to protect the creation of services, it is overkill as it
2535		** disables interrupts while the array is searched.
2536		** The only danger is of another thread trying to create a
2537		** service - service deletion is safe.
2538		** Therefore it is preferable to use state->mutex which,
2539		** although slower to claim, doesn't block interrupts while
2540		** it is held.
2541		*/
2542
2543		lmutex_lock(&state->mutex);
2544
2545		/* Prepare to use a previously unused service */
2546		if (state->unused_service < VCHIQ_MAX_SERVICES)
2547			pservice = &state->services[state->unused_service];
2548
2549		if (srvstate == VCHIQ_SRVSTATE_OPENING) {
2550			for (i = 0; i < state->unused_service; i++) {
2551				VCHIQ_SERVICE_T *srv = state->services[i];
2552				if (!srv) {
2553					pservice = &state->services[i];
2554					break;
2555				}
2556			}
2557		} else {
2558			for (i = (state->unused_service - 1); i >= 0; i--) {
2559				VCHIQ_SERVICE_T *srv = state->services[i];
2560				if (!srv)
2561					pservice = &state->services[i];
2562				else if ((srv->public_fourcc == params->fourcc)
2563					&& ((srv->instance != instance) ||
2564					(srv->base.callback !=
2565					params->callback))) {
2566					/* There is another server using this
2567					** fourcc which doesn't match. */
2568					pservice = NULL;
2569					break;
2570				}
2571			}
2572		}
2573
2574		if (pservice) {
2575			service->localport = (pservice - state->services);
2576			if (!handle_seq)
2577				handle_seq = VCHIQ_MAX_STATES *
2578					 VCHIQ_MAX_SERVICES;
2579			service->handle = handle_seq |
2580				(state->id * VCHIQ_MAX_SERVICES) |
2581				service->localport;
2582			handle_seq += VCHIQ_MAX_STATES * VCHIQ_MAX_SERVICES;
2583			*pservice = service;
2584			if (pservice == &state->services[state->unused_service])
2585				state->unused_service++;
2586		}
2587
2588		lmutex_unlock(&state->mutex);
2589
2590		if (!pservice) {
2591			kfree(service);
2592			service = NULL;
2593		}
2594	}
2595
2596	if (service) {
2597		VCHIQ_SERVICE_QUOTA_T *service_quota =
2598			&state->service_quotas[service->localport];
2599		service_quota->slot_quota = state->default_slot_quota;
2600		service_quota->message_quota = state->default_message_quota;
2601		if (service_quota->slot_use_count == 0)
2602			service_quota->previous_tx_index =
2603				SLOT_QUEUE_INDEX_FROM_POS(state->local_tx_pos)
2604				- 1;
2605
2606		/* Bring this service online */
2607		vchiq_set_service_state(service, srvstate);
2608
2609		vchiq_log_info(vchiq_core_msg_log_level,
2610			"%s Service %c%c%c%c SrcPort:%d",
2611			(srvstate == VCHIQ_SRVSTATE_OPENING)
2612			? "Open" : "Add",
2613			VCHIQ_FOURCC_AS_4CHARS(params->fourcc),
2614			service->localport);
2615	}
2616
2617	/* Don't unlock the service - leave it with a ref_count of 1. */
2618
2619	return service;
2620}
2621
2622VCHIQ_STATUS_T
2623vchiq_open_service_internal(VCHIQ_SERVICE_T *service, int client_id)
2624{
2625	struct vchiq_open_payload payload = {
2626		service->base.fourcc,
2627		client_id,
2628		service->version,
2629		service->version_min
2630	};
2631	VCHIQ_ELEMENT_T body = { &payload, sizeof(payload) };
2632	VCHIQ_STATUS_T status = VCHIQ_SUCCESS;
2633
2634	service->client_id = client_id;
2635	vchiq_use_service_internal(service);
2636	status = queue_message(service->state, NULL,
2637		VCHIQ_MAKE_MSG(VCHIQ_MSG_OPEN, service->localport, 0),
2638		&body, 1, sizeof(payload), 1);
2639	if (status == VCHIQ_SUCCESS) {
2640		if (down_interruptible(&service->remove_event) != 0) {
2641			status = VCHIQ_RETRY;
2642			vchiq_release_service_internal(service);
2643		} else if ((service->srvstate != VCHIQ_SRVSTATE_OPEN) &&
2644			(service->srvstate != VCHIQ_SRVSTATE_OPENSYNC)) {
2645			if (service->srvstate != VCHIQ_SRVSTATE_CLOSEWAIT)
2646				vchiq_log_error(vchiq_core_log_level,
2647					"%d: osi - srvstate = %s (ref %d)",
2648					service->state->id,
2649					srvstate_names[service->srvstate],
2650					service->ref_count);
2651			status = VCHIQ_ERROR;
2652			VCHIQ_SERVICE_STATS_INC(service, error_count);
2653			vchiq_release_service_internal(service);
2654		}
2655	}
2656	return status;
2657}
2658
2659static void
2660release_service_messages(VCHIQ_SERVICE_T *service)
2661{
2662	VCHIQ_STATE_T *state = service->state;
2663	int slot_last = state->remote->slot_last;
2664	int i;
2665
2666	/* Release any claimed messages */
2667	for (i = state->remote->slot_first; i <= slot_last; i++) {
2668		VCHIQ_SLOT_INFO_T *slot_info =
2669			SLOT_INFO_FROM_INDEX(state, i);
2670		if (slot_info->release_count != slot_info->use_count) {
2671			char *data =
2672				(char *)SLOT_DATA_FROM_INDEX(state, i);
2673			unsigned int pos, end;
2674
2675			end = VCHIQ_SLOT_SIZE;
2676			if (data == state->rx_data)
2677				/* This buffer is still being read from - stop
2678				** at the current read position */
2679				end = state->rx_pos & VCHIQ_SLOT_MASK;
2680
2681			pos = 0;
2682
2683			while (pos < end) {
2684				VCHIQ_HEADER_T *header =
2685					(VCHIQ_HEADER_T *)(data + pos);
2686				int msgid = header->msgid;
2687				int port = VCHIQ_MSG_DSTPORT(msgid);
2688				if ((port == service->localport) &&
2689					(msgid & VCHIQ_MSGID_CLAIMED)) {
2690					vchiq_log_info(vchiq_core_log_level,
2691						"  fsi - hdr %x",
2692						(unsigned int)header);
2693					release_slot(state, slot_info, header,
2694						NULL);
2695				}
2696				pos += calc_stride(header->size);
2697				if (pos > VCHIQ_SLOT_SIZE) {
2698					vchiq_log_error(vchiq_core_log_level,
2699						"fsi - pos %x: header %x, "
2700						"msgid %x, header->msgid %x, "
2701						"header->size %x",
2702						pos, (unsigned int)header,
2703						msgid, header->msgid,
2704						header->size);
2705					WARN(1, "invalid slot position\n");
2706				}
2707			}
2708		}
2709	}
2710}
2711
2712static int
2713do_abort_bulks(VCHIQ_SERVICE_T *service)
2714{
2715	VCHIQ_STATUS_T status;
2716
2717	/* Abort any outstanding bulk transfers */
2718	if (lmutex_lock_interruptible(&service->bulk_mutex) != 0)
2719		return 0;
2720	abort_outstanding_bulks(service, &service->bulk_tx);
2721	abort_outstanding_bulks(service, &service->bulk_rx);
2722	lmutex_unlock(&service->bulk_mutex);
2723
2724	status = notify_bulks(service, &service->bulk_tx, 0/*!retry_poll*/);
2725	if (status == VCHIQ_SUCCESS)
2726		status = notify_bulks(service, &service->bulk_rx,
2727			0/*!retry_poll*/);
2728	return (status == VCHIQ_SUCCESS);
2729}
2730
2731static VCHIQ_STATUS_T
2732close_service_complete(VCHIQ_SERVICE_T *service, int failstate)
2733{
2734	VCHIQ_STATUS_T status;
2735	int is_server = (service->public_fourcc != VCHIQ_FOURCC_INVALID);
2736	int newstate;
2737
2738	switch (service->srvstate) {
2739	case VCHIQ_SRVSTATE_OPEN:
2740	case VCHIQ_SRVSTATE_CLOSESENT:
2741	case VCHIQ_SRVSTATE_CLOSERECVD:
2742		if (is_server) {
2743			if (service->auto_close) {
2744				service->client_id = 0;
2745				service->remoteport = VCHIQ_PORT_FREE;
2746				newstate = VCHIQ_SRVSTATE_LISTENING;
2747			} else
2748				newstate = VCHIQ_SRVSTATE_CLOSEWAIT;
2749		} else
2750			newstate = VCHIQ_SRVSTATE_CLOSED;
2751		vchiq_set_service_state(service, newstate);
2752		break;
2753	case VCHIQ_SRVSTATE_LISTENING:
2754		break;
2755	default:
2756		vchiq_log_error(vchiq_core_log_level,
2757			"close_service_complete(%x) called in state %s",
2758			service->handle, srvstate_names[service->srvstate]);
2759		WARN(1, "close_service_complete in unexpected state\n");
2760		return VCHIQ_ERROR;
2761	}
2762
2763	status = make_service_callback(service,
2764		VCHIQ_SERVICE_CLOSED, NULL, NULL);
2765
2766	if (status != VCHIQ_RETRY) {
2767		int uc = service->service_use_count;
2768		int i;
2769		/* Complete the close process */
2770		for (i = 0; i < uc; i++)
2771			/* cater for cases where close is forced and the
2772			** client may not close all it's handles */
2773			vchiq_release_service_internal(service);
2774
2775		service->client_id = 0;
2776		service->remoteport = VCHIQ_PORT_FREE;
2777
2778		if (service->srvstate == VCHIQ_SRVSTATE_CLOSED)
2779			vchiq_free_service_internal(service);
2780		else if (service->srvstate != VCHIQ_SRVSTATE_CLOSEWAIT) {
2781			if (is_server)
2782				service->closing = 0;
2783
2784			up(&service->remove_event);
2785		}
2786	} else
2787		vchiq_set_service_state(service, failstate);
2788
2789	return status;
2790}
2791
2792/* Called by the slot handler */
2793VCHIQ_STATUS_T
2794vchiq_close_service_internal(VCHIQ_SERVICE_T *service, int close_recvd)
2795{
2796	VCHIQ_STATE_T *state = service->state;
2797	VCHIQ_STATUS_T status = VCHIQ_SUCCESS;
2798	int is_server = (service->public_fourcc != VCHIQ_FOURCC_INVALID);
2799
2800	vchiq_log_info(vchiq_core_log_level, "%d: csi:%d,%d (%s)",
2801		service->state->id, service->localport, close_recvd,
2802		srvstate_names[service->srvstate]);
2803
2804	switch (service->srvstate) {
2805	case VCHIQ_SRVSTATE_CLOSED:
2806	case VCHIQ_SRVSTATE_HIDDEN:
2807	case VCHIQ_SRVSTATE_LISTENING:
2808	case VCHIQ_SRVSTATE_CLOSEWAIT:
2809		if (close_recvd)
2810			vchiq_log_error(vchiq_core_log_level,
2811				"vchiq_close_service_internal(1) called "
2812				"in state %s",
2813				srvstate_names[service->srvstate]);
2814		else if (is_server) {
2815			if (service->srvstate == VCHIQ_SRVSTATE_LISTENING) {
2816				status = VCHIQ_ERROR;
2817			} else {
2818				service->client_id = 0;
2819				service->remoteport = VCHIQ_PORT_FREE;
2820				if (service->srvstate ==
2821					VCHIQ_SRVSTATE_CLOSEWAIT)
2822					vchiq_set_service_state(service,
2823						VCHIQ_SRVSTATE_LISTENING);
2824			}
2825			up(&service->remove_event);
2826		} else
2827			vchiq_free_service_internal(service);
2828		break;
2829	case VCHIQ_SRVSTATE_OPENING:
2830		if (close_recvd) {
2831			/* The open was rejected - tell the user */
2832			vchiq_set_service_state(service,
2833				VCHIQ_SRVSTATE_CLOSEWAIT);
2834			up(&service->remove_event);
2835		} else {
2836			/* Shutdown mid-open - let the other side know */
2837			status = queue_message(state, service,
2838				VCHIQ_MAKE_MSG
2839				(VCHIQ_MSG_CLOSE,
2840				service->localport,
2841				VCHIQ_MSG_DSTPORT(service->remoteport)),
2842				NULL, 0, 0, 0);
2843		}
2844		break;
2845
2846	case VCHIQ_SRVSTATE_OPENSYNC:
2847		lmutex_lock(&state->sync_mutex);
2848		/* Drop through */
2849
2850	case VCHIQ_SRVSTATE_OPEN:
2851		if (state->is_master || close_recvd) {
2852			if (!do_abort_bulks(service))
2853				status = VCHIQ_RETRY;
2854		}
2855
2856		release_service_messages(service);
2857
2858		if (status == VCHIQ_SUCCESS)
2859			status = queue_message(state, service,
2860				VCHIQ_MAKE_MSG
2861				(VCHIQ_MSG_CLOSE,
2862				service->localport,
2863				VCHIQ_MSG_DSTPORT(service->remoteport)),
2864				NULL, 0, 0, 0);
2865
2866		if (status == VCHIQ_SUCCESS) {
2867			if (!close_recvd)
2868				break;
2869		} else if (service->srvstate == VCHIQ_SRVSTATE_OPENSYNC) {
2870			lmutex_unlock(&state->sync_mutex);
2871			break;
2872		} else
2873			break;
2874
2875		status = close_service_complete(service,
2876				VCHIQ_SRVSTATE_CLOSERECVD);
2877		break;
2878
2879	case VCHIQ_SRVSTATE_CLOSESENT:
2880		if (!close_recvd)
2881			/* This happens when a process is killed mid-close */
2882			break;
2883
2884		if (!state->is_master) {
2885			if (!do_abort_bulks(service)) {
2886				status = VCHIQ_RETRY;
2887				break;
2888			}
2889		}
2890
2891		if (status == VCHIQ_SUCCESS)
2892			status = close_service_complete(service,
2893				VCHIQ_SRVSTATE_CLOSERECVD);
2894		break;
2895
2896	case VCHIQ_SRVSTATE_CLOSERECVD:
2897		if (!close_recvd && is_server)
2898			/* Force into LISTENING mode */
2899			vchiq_set_service_state(service,
2900				VCHIQ_SRVSTATE_LISTENING);
2901		status = close_service_complete(service,
2902			VCHIQ_SRVSTATE_CLOSERECVD);
2903		break;
2904
2905	default:
2906		vchiq_log_error(vchiq_core_log_level,
2907			"vchiq_close_service_internal(%d) called in state %s",
2908			close_recvd, srvstate_names[service->srvstate]);
2909		break;
2910	}
2911
2912	return status;
2913}
2914
2915/* Called from the application process upon process death */
2916void
2917vchiq_terminate_service_internal(VCHIQ_SERVICE_T *service)
2918{
2919	VCHIQ_STATE_T *state = service->state;
2920
2921	vchiq_log_info(vchiq_core_log_level, "%d: tsi - (%d<->%d)",
2922		state->id, service->localport, service->remoteport);
2923
2924	mark_service_closing(service);
2925
2926	/* Mark the service for removal by the slot handler */
2927	request_poll(state, service, VCHIQ_POLL_REMOVE);
2928}
2929
2930/* Called from the slot handler */
2931void
2932vchiq_free_service_internal(VCHIQ_SERVICE_T *service)
2933{
2934	VCHIQ_STATE_T *state = service->state;
2935
2936	vchiq_log_info(vchiq_core_log_level, "%d: fsi - (%d)",
2937		state->id, service->localport);
2938
2939	switch (service->srvstate) {
2940	case VCHIQ_SRVSTATE_OPENING:
2941	case VCHIQ_SRVSTATE_CLOSED:
2942	case VCHIQ_SRVSTATE_HIDDEN:
2943	case VCHIQ_SRVSTATE_LISTENING:
2944	case VCHIQ_SRVSTATE_CLOSEWAIT:
2945		break;
2946	default:
2947		vchiq_log_error(vchiq_core_log_level,
2948			"%d: fsi - (%d) in state %s",
2949			state->id, service->localport,
2950			srvstate_names[service->srvstate]);
2951		return;
2952	}
2953
2954	vchiq_set_service_state(service, VCHIQ_SRVSTATE_FREE);
2955
2956	up(&service->remove_event);
2957
2958	/* Release the initial lock */
2959	unlock_service(service);
2960}
2961
2962VCHIQ_STATUS_T
2963vchiq_connect_internal(VCHIQ_STATE_T *state, VCHIQ_INSTANCE_T instance)
2964{
2965	VCHIQ_SERVICE_T *service;
2966	int i;
2967
2968	/* Find all services registered to this client and enable them. */
2969	i = 0;
2970	while ((service = next_service_by_instance(state, instance,
2971		&i)) !=	NULL) {
2972		if (service->srvstate == VCHIQ_SRVSTATE_HIDDEN)
2973			vchiq_set_service_state(service,
2974				VCHIQ_SRVSTATE_LISTENING);
2975		unlock_service(service);
2976	}
2977
2978	if (state->conn_state == VCHIQ_CONNSTATE_DISCONNECTED) {
2979		if (queue_message(state, NULL,
2980			VCHIQ_MAKE_MSG(VCHIQ_MSG_CONNECT, 0, 0), NULL, 0,
2981			0, 1) == VCHIQ_RETRY)
2982			return VCHIQ_RETRY;
2983
2984		vchiq_set_conn_state(state, VCHIQ_CONNSTATE_CONNECTING);
2985	}
2986
2987	if (state->conn_state == VCHIQ_CONNSTATE_CONNECTING) {
2988		if (down_interruptible(&state->connect) != 0)
2989			return VCHIQ_RETRY;
2990
2991		vchiq_set_conn_state(state, VCHIQ_CONNSTATE_CONNECTED);
2992		up(&state->connect);
2993	}
2994
2995	return VCHIQ_SUCCESS;
2996}
2997
2998VCHIQ_STATUS_T
2999vchiq_shutdown_internal(VCHIQ_STATE_T *state, VCHIQ_INSTANCE_T instance)
3000{
3001	VCHIQ_SERVICE_T *service;
3002	int i;
3003
3004	/* Find all services registered to this client and enable them. */
3005	i = 0;
3006	while ((service = next_service_by_instance(state, instance,
3007		&i)) !=	NULL) {
3008		(void)vchiq_remove_service(service->handle);
3009		unlock_service(service);
3010	}
3011
3012	return VCHIQ_SUCCESS;
3013}
3014
3015VCHIQ_STATUS_T
3016vchiq_pause_internal(VCHIQ_STATE_T *state)
3017{
3018	VCHIQ_STATUS_T status = VCHIQ_SUCCESS;
3019
3020	switch (state->conn_state) {
3021	case VCHIQ_CONNSTATE_CONNECTED:
3022		/* Request a pause */
3023		vchiq_set_conn_state(state, VCHIQ_CONNSTATE_PAUSING);
3024		request_poll(state, NULL, 0);
3025		break;
3026	default:
3027		vchiq_log_error(vchiq_core_log_level,
3028			"vchiq_pause_internal in state %s\n",
3029			conn_state_names[state->conn_state]);
3030		status = VCHIQ_ERROR;
3031		VCHIQ_STATS_INC(state, error_count);
3032		break;
3033	}
3034
3035	return status;
3036}
3037
3038VCHIQ_STATUS_T
3039vchiq_resume_internal(VCHIQ_STATE_T *state)
3040{
3041	VCHIQ_STATUS_T status = VCHIQ_SUCCESS;
3042
3043	if (state->conn_state == VCHIQ_CONNSTATE_PAUSED) {
3044		vchiq_set_conn_state(state, VCHIQ_CONNSTATE_RESUMING);
3045		request_poll(state, NULL, 0);
3046	} else {
3047		status = VCHIQ_ERROR;
3048		VCHIQ_STATS_INC(state, error_count);
3049	}
3050
3051	return status;
3052}
3053
3054VCHIQ_STATUS_T
3055vchiq_close_service(VCHIQ_SERVICE_HANDLE_T handle)
3056{
3057	/* Unregister the service */
3058	VCHIQ_SERVICE_T *service = find_service_by_handle(handle);
3059	VCHIQ_STATUS_T status = VCHIQ_SUCCESS;
3060
3061	if (!service)
3062		return VCHIQ_ERROR;
3063
3064	vchiq_log_info(vchiq_core_log_level,
3065		"%d: close_service:%d",
3066		service->state->id, service->localport);
3067
3068	if ((service->srvstate == VCHIQ_SRVSTATE_FREE) ||
3069		(service->srvstate == VCHIQ_SRVSTATE_LISTENING) ||
3070		(service->srvstate == VCHIQ_SRVSTATE_HIDDEN)) {
3071		unlock_service(service);
3072		return VCHIQ_ERROR;
3073	}
3074
3075	mark_service_closing(service);
3076
3077	if (current == service->state->slot_handler_thread) {
3078		status = vchiq_close_service_internal(service,
3079			0/*!close_recvd*/);
3080		BUG_ON(status == VCHIQ_RETRY);
3081	} else {
3082	/* Mark the service for termination by the slot handler */
3083		request_poll(service->state, service, VCHIQ_POLL_TERMINATE);
3084	}
3085
3086	while (1) {
3087		if (down_interruptible(&service->remove_event) != 0) {
3088			status = VCHIQ_RETRY;
3089			break;
3090		}
3091
3092		if ((service->srvstate == VCHIQ_SRVSTATE_FREE) ||
3093			(service->srvstate == VCHIQ_SRVSTATE_LISTENING) ||
3094			(service->srvstate == VCHIQ_SRVSTATE_OPEN))
3095			break;
3096
3097		vchiq_log_warning(vchiq_core_log_level,
3098			"%d: close_service:%d - waiting in state %s",
3099			service->state->id, service->localport,
3100			srvstate_names[service->srvstate]);
3101	}
3102
3103	if ((status == VCHIQ_SUCCESS) &&
3104		(service->srvstate != VCHIQ_SRVSTATE_FREE) &&
3105		(service->srvstate != VCHIQ_SRVSTATE_LISTENING))
3106		status = VCHIQ_ERROR;
3107
3108	unlock_service(service);
3109
3110	return status;
3111}
3112
3113VCHIQ_STATUS_T
3114vchiq_remove_service(VCHIQ_SERVICE_HANDLE_T handle)
3115{
3116	/* Unregister the service */
3117	VCHIQ_SERVICE_T *service = find_service_by_handle(handle);
3118	VCHIQ_STATUS_T status = VCHIQ_SUCCESS;
3119
3120	if (!service)
3121		return VCHIQ_ERROR;
3122
3123	vchiq_log_info(vchiq_core_log_level,
3124		"%d: remove_service:%d",
3125		service->state->id, service->localport);
3126
3127	if (service->srvstate == VCHIQ_SRVSTATE_FREE) {
3128		unlock_service(service);
3129		return VCHIQ_ERROR;
3130	}
3131
3132	mark_service_closing(service);
3133
3134	if ((service->srvstate == VCHIQ_SRVSTATE_HIDDEN) ||
3135		(current == service->state->slot_handler_thread)) {
3136		/* Make it look like a client, because it must be removed and
3137		   not left in the LISTENING state. */
3138		service->public_fourcc = VCHIQ_FOURCC_INVALID;
3139
3140		status = vchiq_close_service_internal(service,
3141			0/*!close_recvd*/);
3142		BUG_ON(status == VCHIQ_RETRY);
3143	} else {
3144		/* Mark the service for removal by the slot handler */
3145		request_poll(service->state, service, VCHIQ_POLL_REMOVE);
3146	}
3147	while (1) {
3148		if (down_interruptible(&service->remove_event) != 0) {
3149			status = VCHIQ_RETRY;
3150			break;
3151		}
3152
3153		if ((service->srvstate == VCHIQ_SRVSTATE_FREE) ||
3154			(service->srvstate == VCHIQ_SRVSTATE_OPEN))
3155			break;
3156
3157		vchiq_log_warning(vchiq_core_log_level,
3158			"%d: remove_service:%d - waiting in state %s",
3159			service->state->id, service->localport,
3160			srvstate_names[service->srvstate]);
3161	}
3162
3163	if ((status == VCHIQ_SUCCESS) &&
3164		(service->srvstate != VCHIQ_SRVSTATE_FREE))
3165		status = VCHIQ_ERROR;
3166
3167	unlock_service(service);
3168
3169	return status;
3170}
3171
3172
3173/* This function may be called by kernel threads or user threads.
3174 * User threads may receive VCHIQ_RETRY to indicate that a signal has been
3175 * received and the call should be retried after being returned to user
3176 * context.
3177 * When called in blocking mode, the userdata field points to a bulk_waiter
3178 * structure.
3179 */
3180VCHIQ_STATUS_T
3181vchiq_bulk_transfer(VCHIQ_SERVICE_HANDLE_T handle,
3182	VCHI_MEM_HANDLE_T memhandle, void *offset, int size, void *userdata,
3183	VCHIQ_BULK_MODE_T mode, VCHIQ_BULK_DIR_T dir)
3184{
3185	VCHIQ_SERVICE_T *service = find_service_by_handle(handle);
3186	VCHIQ_BULK_QUEUE_T *queue;
3187	VCHIQ_BULK_T *bulk;
3188	VCHIQ_STATE_T *state;
3189	struct bulk_waiter *bulk_waiter = NULL;
3190	const char dir_char = (dir == VCHIQ_BULK_TRANSMIT) ? 't' : 'r';
3191	const int dir_msgtype = (dir == VCHIQ_BULK_TRANSMIT) ?
3192		VCHIQ_MSG_BULK_TX : VCHIQ_MSG_BULK_RX;
3193	VCHIQ_STATUS_T status = VCHIQ_ERROR;
3194
3195	if (!service ||
3196		 (service->srvstate != VCHIQ_SRVSTATE_OPEN) ||
3197		 ((memhandle == VCHI_MEM_HANDLE_INVALID) && (offset == NULL)) ||
3198		 (vchiq_check_service(service) != VCHIQ_SUCCESS))
3199		goto error_exit;
3200
3201	switch (mode) {
3202	case VCHIQ_BULK_MODE_NOCALLBACK:
3203	case VCHIQ_BULK_MODE_CALLBACK:
3204		break;
3205	case VCHIQ_BULK_MODE_BLOCKING:
3206		bulk_waiter = (struct bulk_waiter *)userdata;
3207		_sema_init(&bulk_waiter->event, 0);
3208		bulk_waiter->actual = 0;
3209		bulk_waiter->bulk = NULL;
3210		break;
3211	case VCHIQ_BULK_MODE_WAITING:
3212		bulk_waiter = (struct bulk_waiter *)userdata;
3213		bulk = bulk_waiter->bulk;
3214		goto waiting;
3215	default:
3216		goto error_exit;
3217	}
3218
3219	state = service->state;
3220
3221	queue = (dir == VCHIQ_BULK_TRANSMIT) ?
3222		&service->bulk_tx : &service->bulk_rx;
3223
3224	if (lmutex_lock_interruptible(&service->bulk_mutex) != 0) {
3225		status = VCHIQ_RETRY;
3226		goto error_exit;
3227	}
3228
3229	if (queue->local_insert == queue->remove + VCHIQ_NUM_SERVICE_BULKS) {
3230		VCHIQ_SERVICE_STATS_INC(service, bulk_stalls);
3231		do {
3232			lmutex_unlock(&service->bulk_mutex);
3233			if (down_interruptible(&service->bulk_remove_event)
3234				!= 0) {
3235				status = VCHIQ_RETRY;
3236				goto error_exit;
3237			}
3238			if (lmutex_lock_interruptible(&service->bulk_mutex)
3239				!= 0) {
3240				status = VCHIQ_RETRY;
3241				goto error_exit;
3242			}
3243		} while (queue->local_insert == queue->remove +
3244				VCHIQ_NUM_SERVICE_BULKS);
3245	}
3246
3247	bulk = &queue->bulks[BULK_INDEX(queue->local_insert)];
3248
3249	bulk->mode = mode;
3250	bulk->dir = dir;
3251	bulk->userdata = userdata;
3252	bulk->size = size;
3253	bulk->actual = VCHIQ_BULK_ACTUAL_ABORTED;
3254
3255	if (vchiq_prepare_bulk_data(bulk, memhandle,
3256		(void*)offset, size, dir) !=
3257		VCHIQ_SUCCESS)
3258		goto unlock_error_exit;
3259
3260	wmb();
3261
3262	vchiq_log_info(vchiq_core_log_level,
3263		"%d: bt (%d->%d) %cx %x@%x %x",
3264		state->id,
3265		service->localport, service->remoteport, dir_char,
3266		size, (unsigned int)bulk->data, (unsigned int)userdata);
3267
3268	if (state->is_master) {
3269		queue->local_insert++;
3270		if (resolve_bulks(service, queue))
3271			request_poll(state, service,
3272				(dir == VCHIQ_BULK_TRANSMIT) ?
3273				VCHIQ_POLL_TXNOTIFY : VCHIQ_POLL_RXNOTIFY);
3274	} else {
3275		int payload[2] = { (int)bulk->data, bulk->size };
3276		VCHIQ_ELEMENT_T element = { payload, sizeof(payload) };
3277
3278		status = queue_message(state, NULL,
3279			VCHIQ_MAKE_MSG(dir_msgtype,
3280				service->localport, service->remoteport),
3281			&element, 1, sizeof(payload), 1);
3282		if (status != VCHIQ_SUCCESS) {
3283			vchiq_complete_bulk(bulk);
3284			goto unlock_error_exit;
3285		}
3286		queue->local_insert++;
3287	}
3288
3289	lmutex_unlock(&service->bulk_mutex);
3290
3291	vchiq_log_trace(vchiq_core_log_level,
3292		"%d: bt:%d %cx li=%x ri=%x p=%x",
3293		state->id,
3294		service->localport, dir_char,
3295		queue->local_insert, queue->remote_insert, queue->process);
3296
3297waiting:
3298	unlock_service(service);
3299
3300	status = VCHIQ_SUCCESS;
3301
3302	if (bulk_waiter) {
3303		bulk_waiter->bulk = bulk;
3304		if (down_interruptible(&bulk_waiter->event) != 0)
3305			status = VCHIQ_RETRY;
3306		else if (bulk_waiter->actual == VCHIQ_BULK_ACTUAL_ABORTED)
3307			status = VCHIQ_ERROR;
3308	}
3309
3310	return status;
3311
3312unlock_error_exit:
3313	lmutex_unlock(&service->bulk_mutex);
3314
3315error_exit:
3316	if (service)
3317		unlock_service(service);
3318	return status;
3319}
3320
3321VCHIQ_STATUS_T
3322vchiq_queue_message(VCHIQ_SERVICE_HANDLE_T handle,
3323	const VCHIQ_ELEMENT_T *elements, int count)
3324{
3325	VCHIQ_SERVICE_T *service = find_service_by_handle(handle);
3326	VCHIQ_STATUS_T status = VCHIQ_ERROR;
3327
3328	unsigned int size = 0;
3329	unsigned int i;
3330
3331	if (!service ||
3332		(vchiq_check_service(service) != VCHIQ_SUCCESS))
3333		goto error_exit;
3334
3335	for (i = 0; i < (unsigned int)count; i++) {
3336		if (elements[i].size) {
3337			if (elements[i].data == NULL) {
3338				VCHIQ_SERVICE_STATS_INC(service, error_count);
3339				goto error_exit;
3340			}
3341			size += elements[i].size;
3342		}
3343	}
3344
3345	if (size > VCHIQ_MAX_MSG_SIZE) {
3346		VCHIQ_SERVICE_STATS_INC(service, error_count);
3347		goto error_exit;
3348	}
3349
3350	switch (service->srvstate) {
3351	case VCHIQ_SRVSTATE_OPEN:
3352		status = queue_message(service->state, service,
3353				VCHIQ_MAKE_MSG(VCHIQ_MSG_DATA,
3354					service->localport,
3355					service->remoteport),
3356				elements, count, size, 1);
3357		break;
3358	case VCHIQ_SRVSTATE_OPENSYNC:
3359		status = queue_message_sync(service->state, service,
3360				VCHIQ_MAKE_MSG(VCHIQ_MSG_DATA,
3361					service->localport,
3362					service->remoteport),
3363				elements, count, size, 1);
3364		break;
3365	default:
3366		status = VCHIQ_ERROR;
3367		break;
3368	}
3369
3370error_exit:
3371	if (service)
3372		unlock_service(service);
3373
3374	return status;
3375}
3376
3377void
3378vchiq_release_message(VCHIQ_SERVICE_HANDLE_T handle, VCHIQ_HEADER_T *header)
3379{
3380	VCHIQ_SERVICE_T *service = find_service_by_handle(handle);
3381	VCHIQ_SHARED_STATE_T *remote;
3382	VCHIQ_STATE_T *state;
3383	int slot_index;
3384
3385	if (!service)
3386		return;
3387
3388	state = service->state;
3389	remote = state->remote;
3390
3391	slot_index = SLOT_INDEX_FROM_DATA(state, (void *)header);
3392
3393	if ((slot_index >= remote->slot_first) &&
3394		(slot_index <= remote->slot_last)) {
3395		int msgid = header->msgid;
3396		if (msgid & VCHIQ_MSGID_CLAIMED) {
3397			VCHIQ_SLOT_INFO_T *slot_info =
3398				SLOT_INFO_FROM_INDEX(state, slot_index);
3399
3400			release_slot(state, slot_info, header, service);
3401		}
3402	} else if (slot_index == remote->slot_sync)
3403		release_message_sync(state, header);
3404
3405	unlock_service(service);
3406}
3407
3408static void
3409release_message_sync(VCHIQ_STATE_T *state, VCHIQ_HEADER_T *header)
3410{
3411	header->msgid = VCHIQ_MSGID_PADDING;
3412	wmb();
3413	remote_event_signal(&state->remote->sync_release);
3414}
3415
3416VCHIQ_STATUS_T
3417vchiq_get_peer_version(VCHIQ_SERVICE_HANDLE_T handle, short *peer_version)
3418{
3419   VCHIQ_STATUS_T status = VCHIQ_ERROR;
3420   VCHIQ_SERVICE_T *service = find_service_by_handle(handle);
3421
3422   if (!service ||
3423      (vchiq_check_service(service) != VCHIQ_SUCCESS) ||
3424      !peer_version)
3425      goto exit;
3426   *peer_version = service->peer_version;
3427   status = VCHIQ_SUCCESS;
3428
3429exit:
3430   if (service)
3431      unlock_service(service);
3432   return status;
3433}
3434
3435VCHIQ_STATUS_T
3436vchiq_get_config(VCHIQ_INSTANCE_T instance,
3437	int config_size, VCHIQ_CONFIG_T *pconfig)
3438{
3439	VCHIQ_CONFIG_T config;
3440
3441	(void)instance;
3442
3443	config.max_msg_size           = VCHIQ_MAX_MSG_SIZE;
3444	config.bulk_threshold         = VCHIQ_MAX_MSG_SIZE;
3445	config.max_outstanding_bulks  = VCHIQ_NUM_SERVICE_BULKS;
3446	config.max_services           = VCHIQ_MAX_SERVICES;
3447	config.version                = VCHIQ_VERSION;
3448	config.version_min            = VCHIQ_VERSION_MIN;
3449
3450	if (config_size > sizeof(VCHIQ_CONFIG_T))
3451		return VCHIQ_ERROR;
3452
3453	memcpy(pconfig, &config,
3454		min(config_size, (int)(sizeof(VCHIQ_CONFIG_T))));
3455
3456	return VCHIQ_SUCCESS;
3457}
3458
3459VCHIQ_STATUS_T
3460vchiq_set_service_option(VCHIQ_SERVICE_HANDLE_T handle,
3461	VCHIQ_SERVICE_OPTION_T option, int value)
3462{
3463	VCHIQ_SERVICE_T *service = find_service_by_handle(handle);
3464	VCHIQ_STATUS_T status = VCHIQ_ERROR;
3465
3466	if (service) {
3467		switch (option) {
3468		case VCHIQ_SERVICE_OPTION_AUTOCLOSE:
3469			service->auto_close = value;
3470			status = VCHIQ_SUCCESS;
3471			break;
3472
3473		case VCHIQ_SERVICE_OPTION_SLOT_QUOTA: {
3474			VCHIQ_SERVICE_QUOTA_T *service_quota =
3475				&service->state->service_quotas[
3476					service->localport];
3477			if (value == 0)
3478				value = service->state->default_slot_quota;
3479			if ((value >= service_quota->slot_use_count) &&
3480				 (value < (unsigned short)~0)) {
3481				service_quota->slot_quota = value;
3482				if ((value >= service_quota->slot_use_count) &&
3483					(service_quota->message_quota >=
3484					 service_quota->message_use_count)) {
3485					/* Signal the service that it may have
3486					** dropped below its quota */
3487					up(&service_quota->quota_event);
3488				}
3489				status = VCHIQ_SUCCESS;
3490			}
3491		} break;
3492
3493		case VCHIQ_SERVICE_OPTION_MESSAGE_QUOTA: {
3494			VCHIQ_SERVICE_QUOTA_T *service_quota =
3495				&service->state->service_quotas[
3496					service->localport];
3497			if (value == 0)
3498				value = service->state->default_message_quota;
3499			if ((value >= service_quota->message_use_count) &&
3500				 (value < (unsigned short)~0)) {
3501				service_quota->message_quota = value;
3502				if ((value >=
3503					service_quota->message_use_count) &&
3504					(service_quota->slot_quota >=
3505					service_quota->slot_use_count))
3506					/* Signal the service that it may have
3507					** dropped below its quota */
3508					up(&service_quota->quota_event);
3509				status = VCHIQ_SUCCESS;
3510			}
3511		} break;
3512
3513		case VCHIQ_SERVICE_OPTION_SYNCHRONOUS:
3514			if ((service->srvstate == VCHIQ_SRVSTATE_HIDDEN) ||
3515				(service->srvstate ==
3516				VCHIQ_SRVSTATE_LISTENING)) {
3517				service->sync = value;
3518				status = VCHIQ_SUCCESS;
3519			}
3520			break;
3521
3522		default:
3523			break;
3524		}
3525		unlock_service(service);
3526	}
3527
3528	return status;
3529}
3530
3531static void
3532vchiq_dump_shared_state(void *dump_context, VCHIQ_STATE_T *state,
3533	VCHIQ_SHARED_STATE_T *shared, const char *label)
3534{
3535	static const char *const debug_names[] = {
3536		"<entries>",
3537		"SLOT_HANDLER_COUNT",
3538		"SLOT_HANDLER_LINE",
3539		"PARSE_LINE",
3540		"PARSE_HEADER",
3541		"PARSE_MSGID",
3542		"AWAIT_COMPLETION_LINE",
3543		"DEQUEUE_MESSAGE_LINE",
3544		"SERVICE_CALLBACK_LINE",
3545		"MSG_QUEUE_FULL_COUNT",
3546		"COMPLETION_QUEUE_FULL_COUNT"
3547	};
3548	int i;
3549
3550	char buf[80];
3551	int len;
3552	len = snprintf(buf, sizeof(buf),
3553		"  %s: slots %d-%d tx_pos=%x recycle=%x",
3554		label, shared->slot_first, shared->slot_last,
3555		shared->tx_pos, shared->slot_queue_recycle);
3556	vchiq_dump(dump_context, buf, len + 1);
3557
3558	len = snprintf(buf, sizeof(buf),
3559		"    Slots claimed:");
3560	vchiq_dump(dump_context, buf, len + 1);
3561
3562	for (i = shared->slot_first; i <= shared->slot_last; i++) {
3563		VCHIQ_SLOT_INFO_T slot_info = *SLOT_INFO_FROM_INDEX(state, i);
3564		if (slot_info.use_count != slot_info.release_count) {
3565			len = snprintf(buf, sizeof(buf),
3566				"      %d: %d/%d", i, slot_info.use_count,
3567				slot_info.release_count);
3568			vchiq_dump(dump_context, buf, len + 1);
3569		}
3570	}
3571
3572	for (i = 1; i < shared->debug[DEBUG_ENTRIES]; i++) {
3573		len = snprintf(buf, sizeof(buf), "    DEBUG: %s = %d(%x)",
3574			debug_names[i], shared->debug[i], shared->debug[i]);
3575		vchiq_dump(dump_context, buf, len + 1);
3576	}
3577}
3578
3579void
3580vchiq_dump_state(void *dump_context, VCHIQ_STATE_T *state)
3581{
3582	char buf[80];
3583	int len;
3584	int i;
3585
3586	len = snprintf(buf, sizeof(buf), "State %d: %s", state->id,
3587		conn_state_names[state->conn_state]);
3588	vchiq_dump(dump_context, buf, len + 1);
3589
3590	len = snprintf(buf, sizeof(buf),
3591		"  tx_pos=%x(@%x), rx_pos=%x(@%x)",
3592		state->local->tx_pos,
3593		(uint32_t)state->tx_data +
3594			(state->local_tx_pos & VCHIQ_SLOT_MASK),
3595		state->rx_pos,
3596		(uint32_t)state->rx_data +
3597			(state->rx_pos & VCHIQ_SLOT_MASK));
3598	vchiq_dump(dump_context, buf, len + 1);
3599
3600	len = snprintf(buf, sizeof(buf),
3601		"  Version: %d (min %d)",
3602		VCHIQ_VERSION, VCHIQ_VERSION_MIN);
3603	vchiq_dump(dump_context, buf, len + 1);
3604
3605	if (VCHIQ_ENABLE_STATS) {
3606		len = snprintf(buf, sizeof(buf),
3607			"  Stats: ctrl_tx_count=%d, ctrl_rx_count=%d, "
3608			"error_count=%d",
3609			state->stats.ctrl_tx_count, state->stats.ctrl_rx_count,
3610			state->stats.error_count);
3611		vchiq_dump(dump_context, buf, len + 1);
3612	}
3613
3614	len = snprintf(buf, sizeof(buf),
3615		"  Slots: %d available (%d data), %d recyclable, %d stalls "
3616		"(%d data)",
3617		((state->slot_queue_available * VCHIQ_SLOT_SIZE) -
3618			state->local_tx_pos) / VCHIQ_SLOT_SIZE,
3619		state->data_quota - state->data_use_count,
3620		state->local->slot_queue_recycle - state->slot_queue_available,
3621		state->stats.slot_stalls, state->stats.data_stalls);
3622	vchiq_dump(dump_context, buf, len + 1);
3623
3624	vchiq_dump_platform_state(dump_context);
3625
3626	vchiq_dump_shared_state(dump_context, state, state->local, "Local");
3627	vchiq_dump_shared_state(dump_context, state, state->remote, "Remote");
3628
3629	vchiq_dump_platform_instances(dump_context);
3630
3631	for (i = 0; i < state->unused_service; i++) {
3632		VCHIQ_SERVICE_T *service = find_service_by_port(state, i);
3633
3634		if (service) {
3635			vchiq_dump_service_state(dump_context, service);
3636			unlock_service(service);
3637		}
3638	}
3639}
3640
3641void
3642vchiq_dump_service_state(void *dump_context, VCHIQ_SERVICE_T *service)
3643{
3644	char buf[80];
3645	int len;
3646
3647	len = snprintf(buf, sizeof(buf), "Service %d: %s (ref %u)",
3648		service->localport, srvstate_names[service->srvstate],
3649		service->ref_count - 1); /*Don't include the lock just taken*/
3650
3651	if (service->srvstate != VCHIQ_SRVSTATE_FREE) {
3652		char remoteport[30];
3653		VCHIQ_SERVICE_QUOTA_T *service_quota =
3654			&service->state->service_quotas[service->localport];
3655		int fourcc = service->base.fourcc;
3656		int tx_pending, rx_pending;
3657		if (service->remoteport != VCHIQ_PORT_FREE) {
3658			int len2 = snprintf(remoteport, sizeof(remoteport),
3659				"%d", service->remoteport);
3660			if (service->public_fourcc != VCHIQ_FOURCC_INVALID)
3661				snprintf(remoteport + len2,
3662					sizeof(remoteport) - len2,
3663					" (client %x)", service->client_id);
3664		} else
3665			strcpy(remoteport, "n/a");
3666
3667		len += snprintf(buf + len, sizeof(buf) - len,
3668			" '%c%c%c%c' remote %s (msg use %d/%d, slot use %d/%d)",
3669			VCHIQ_FOURCC_AS_4CHARS(fourcc),
3670			remoteport,
3671			service_quota->message_use_count,
3672			service_quota->message_quota,
3673			service_quota->slot_use_count,
3674			service_quota->slot_quota);
3675
3676		vchiq_dump(dump_context, buf, len + 1);
3677
3678		tx_pending = service->bulk_tx.local_insert -
3679			service->bulk_tx.remote_insert;
3680
3681		rx_pending = service->bulk_rx.local_insert -
3682			service->bulk_rx.remote_insert;
3683
3684		len = snprintf(buf, sizeof(buf),
3685			"  Bulk: tx_pending=%d (size %d),"
3686			" rx_pending=%d (size %d)",
3687			tx_pending,
3688			tx_pending ? service->bulk_tx.bulks[
3689			BULK_INDEX(service->bulk_tx.remove)].size : 0,
3690			rx_pending,
3691			rx_pending ? service->bulk_rx.bulks[
3692			BULK_INDEX(service->bulk_rx.remove)].size : 0);
3693
3694		if (VCHIQ_ENABLE_STATS) {
3695			vchiq_dump(dump_context, buf, len + 1);
3696
3697			len = snprintf(buf, sizeof(buf),
3698				"  Ctrl: tx_count=%d, tx_bytes=%llu, "
3699				"rx_count=%d, rx_bytes=%llu",
3700				service->stats.ctrl_tx_count,
3701				service->stats.ctrl_tx_bytes,
3702				service->stats.ctrl_rx_count,
3703				service->stats.ctrl_rx_bytes);
3704			vchiq_dump(dump_context, buf, len + 1);
3705
3706			len = snprintf(buf, sizeof(buf),
3707				"  Bulk: tx_count=%d, tx_bytes=%llu, "
3708				"rx_count=%d, rx_bytes=%llu",
3709				service->stats.bulk_tx_count,
3710				service->stats.bulk_tx_bytes,
3711				service->stats.bulk_rx_count,
3712				service->stats.bulk_rx_bytes);
3713			vchiq_dump(dump_context, buf, len + 1);
3714
3715			len = snprintf(buf, sizeof(buf),
3716				"  %d quota stalls, %d slot stalls, "
3717				"%d bulk stalls, %d aborted, %d errors",
3718				service->stats.quota_stalls,
3719				service->stats.slot_stalls,
3720				service->stats.bulk_stalls,
3721				service->stats.bulk_aborted_count,
3722				service->stats.error_count);
3723		 }
3724	}
3725
3726	vchiq_dump(dump_context, buf, len + 1);
3727
3728	if (service->srvstate != VCHIQ_SRVSTATE_FREE)
3729		vchiq_dump_platform_service_state(dump_context, service);
3730}
3731
3732
3733void
3734vchiq_loud_error_header(void)
3735{
3736	vchiq_log_error(vchiq_core_log_level,
3737		"============================================================"
3738		"================");
3739	vchiq_log_error(vchiq_core_log_level,
3740		"============================================================"
3741		"================");
3742	vchiq_log_error(vchiq_core_log_level, "=====");
3743}
3744
3745void
3746vchiq_loud_error_footer(void)
3747{
3748	vchiq_log_error(vchiq_core_log_level, "=====");
3749	vchiq_log_error(vchiq_core_log_level,
3750		"============================================================"
3751		"================");
3752	vchiq_log_error(vchiq_core_log_level,
3753		"============================================================"
3754		"================");
3755}
3756
3757
3758VCHIQ_STATUS_T vchiq_send_remote_use(VCHIQ_STATE_T *state)
3759{
3760	VCHIQ_STATUS_T status = VCHIQ_RETRY;
3761	if (state->conn_state != VCHIQ_CONNSTATE_DISCONNECTED)
3762		status = queue_message(state, NULL,
3763			VCHIQ_MAKE_MSG(VCHIQ_MSG_REMOTE_USE, 0, 0),
3764			NULL, 0, 0, 0);
3765	return status;
3766}
3767
3768VCHIQ_STATUS_T vchiq_send_remote_release(VCHIQ_STATE_T *state)
3769{
3770	VCHIQ_STATUS_T status = VCHIQ_RETRY;
3771	if (state->conn_state != VCHIQ_CONNSTATE_DISCONNECTED)
3772		status = queue_message(state, NULL,
3773			VCHIQ_MAKE_MSG(VCHIQ_MSG_REMOTE_RELEASE, 0, 0),
3774			NULL, 0, 0, 0);
3775	return status;
3776}
3777
3778VCHIQ_STATUS_T vchiq_send_remote_use_active(VCHIQ_STATE_T *state)
3779{
3780	VCHIQ_STATUS_T status = VCHIQ_RETRY;
3781	if (state->conn_state != VCHIQ_CONNSTATE_DISCONNECTED)
3782		status = queue_message(state, NULL,
3783			VCHIQ_MAKE_MSG(VCHIQ_MSG_REMOTE_USE_ACTIVE, 0, 0),
3784			NULL, 0, 0, 0);
3785	return status;
3786}
3787
3788void vchiq_log_dump_mem(const char *label, uint32_t addr, const void *voidMem,
3789	size_t numBytes)
3790{
3791	const uint8_t  *mem = (const uint8_t *)voidMem;
3792	size_t          offset;
3793	char            lineBuf[100];
3794	char           *s;
3795
3796	while (numBytes > 0) {
3797		s = lineBuf;
3798
3799		for (offset = 0; offset < 16; offset++) {
3800			if (offset < numBytes)
3801				s += snprintf(s, 4, "%02x ", mem[offset]);
3802			else
3803				s += snprintf(s, 4, "   ");
3804		}
3805
3806		for (offset = 0; offset < 16; offset++) {
3807			if (offset < numBytes) {
3808				uint8_t ch = mem[offset];
3809
3810				if ((ch < ' ') || (ch > '~'))
3811					ch = '.';
3812				*s++ = (char)ch;
3813			}
3814		}
3815		*s++ = '\0';
3816
3817		if ((label != NULL) && (*label != '\0'))
3818			vchiq_log_trace(VCHIQ_LOG_TRACE,
3819				"%s: %08x: %s", label, addr, lineBuf);
3820		else
3821			vchiq_log_trace(VCHIQ_LOG_TRACE,
3822				"%08x: %s", addr, lineBuf);
3823
3824		addr += 16;
3825		mem += 16;
3826		if (numBytes > 16)
3827			numBytes -= 16;
3828		else
3829			numBytes = 0;
3830	}
3831}
3832