group-agent.c 6.69 KB
Newer Older
Timothy Stack's avatar
 
Timothy Stack committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
/*
 * EMULAB-COPYRIGHT
 * Copyright (c) 2004, 2005 University of Utah and the Flux Group.
 * All rights reserved.
 */

/**
 * @file group-agent.c
 */

#include "config.h"

#include <assert.h>

#include "group-agent.h"

/**
 * Event expansion callback, this function will expand the given event to the
 * same number of events as agents in this group.
 *
 * @param la This group agent object.
 * @param se The sched_event_t to expand.
 * @return Zero on success, -1 otherwise.
 */
static int group_agent_expand(local_agent_t la, sched_event_t *se);

/**
 * The "immediate" callback for handling group agent events, however, since
 * there are no group agent events, this will blow an assert.
 *
 * @param la This group agent object.
 * @param se The sched_event_t to handle.
 * @return Zero on success, -1 otherwise.
 */
static int group_agent_immediate(local_agent_t la, sched_event_t *se);

group_agent_t create_group_agent(struct agent *agent)
{
	struct agent **agent_array = NULL;
	group_agent_t ga, retval;
	
	if ((ga = malloc(sizeof(struct _group_agent))) == NULL) {
		retval = NULL;
		errno = ENOMEM;
	}
	else if ((agent_array = malloc(sizeof(struct agent *))) == NULL) {
		retval = NULL;
		errno = ENOMEM;
	}
	else if (local_agent_init(&ga->ga_local_agent) != 0) {
		retval = NULL;
	}
	else {
		ga->ga_local_agent.la_link.ln_Name = agent->name;
		ga->ga_local_agent.la_agent = agent;
		ga->ga_local_agent.la_flags |= LAF_IMMEDIATE;
		ga->ga_local_agent.la_expand = group_agent_expand;
		ga->ga_local_agent.la_immediate = group_agent_immediate;
		ga->ga_token = ~0;
		ga->ga_agents = agent_array;
		agent_array = NULL;
		ga->ga_agents[0] = agent;
		ga->ga_count = 0;
		ga->ga_remaining = -1;

		retval = ga;
		ga = NULL;
	}

	free(agent_array);
	agent_array = NULL;

	free(ga);
	ga = NULL;
	
	return retval;
}

int group_agent_invariant(group_agent_t ga)
{
	char *type;
	int lpc;
	
	assert(ga != NULL);
	assert(local_agent_invariant(&ga->ga_local_agent));

	assert(ga->ga_agents != NULL);
	type = ga->ga_agents[1]->objtype;
	for (lpc = 1; lpc <= ga->ga_count; lpc++) {
		assert(ga->ga_agents[lpc] != NULL);
		assert(strcmp(ga->ga_agents[lpc]->objtype, type) == 0);
	}

	return 1;
}

int group_agent_append(group_agent_t ga, struct agent *agent)
{
	struct agent **new_agents;
	int retval;
	
	assert(ga != NULL);
	assert(group_agent_invariant(ga));
	assert(agent != NULL);
	assert(agent_invariant(agent));

	if ((new_agents = realloc(ga->ga_agents,
				  (ga->ga_count + 2) *
				  sizeof(struct agent *))) == NULL) {
		retval = -1;
		errno = ENOMEM;
	}
	else {
		ga->ga_agents = new_agents;
		new_agents = NULL;
		
		ga->ga_agents[1 + ga->ga_count] = agent;
		ga->ga_count += 1;
		
		retval = 0;
	}

	assert(group_agent_invariant(ga));

	return retval;
}

int group_agent_handle_complete(event_handle_t handle,
				struct lnList *list,
				struct agent *agent,
				int ctoken,
				int agerror)
{
	group_agent_t ga;
	int retval = 0;

	assert(handle != NULL);
	assert(list != NULL);
	lnCheck(list);
	assert(agent != NULL);
	assert(agent_invariant(agent));

	ga = (group_agent_t)list->lh_Head;
	while (ga->ga_local_agent.la_link.ln_Succ != NULL) {
		int lpc, found = 0;

		assert(group_agent_invariant(ga));

		if (ga->ga_remaining > 0) {
			for (lpc = 1; (lpc <= ga->ga_count) && !found; lpc++) {
#if 0
				printf("cmp %s %s  %d %d\n",
				       agent->name,
				       ga->ga_agents[lpc]->name,
				       ctoken,
				       ga->ga_token);
#endif
				if ((agent == ga->ga_agents[lpc]) &&
				    (ga->ga_token == ctoken)) {
					ga->ga_remaining -= 1;
					if (agerror > ga->ga_error) {
						ga->ga_error = agerror;
					}
					
					found = 1;
				}
			}
		}

		if (ga->ga_remaining == 0) {
#if 0
			printf("group complete %s\n",
			       ga->ga_local_agent.la_link.ln_Name);
#endif
			event_do(handle,
				 EA_Experiment, pideid,
				 EA_Type, TBDB_OBJECTTYPE_GROUP,
				 EA_Name, ga->ga_local_agent.la_link.ln_Name,
				 EA_Event, TBDB_EVENTTYPE_COMPLETE,
				 EA_ArgInteger, "ERROR", ga->ga_error,
				 EA_ArgInteger, "CTOKEN", ga->ga_token,
				 EA_TAG_DONE);
			
			ga->ga_token = ~0;
			ga->ga_remaining = -1;
Timothy Stack's avatar
   
Timothy Stack committed
186
187

			retval += 1;
Timothy Stack's avatar
 
Timothy Stack committed
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
		}
		else {
#if 0
			printf("group remains %d\n", ga->ga_remaining);
#endif
		}
		
		ga = (group_agent_t)ga->ga_local_agent.la_link.ln_Succ;
	}

	return retval;
}

static int group_agent_expand(local_agent_t la, sched_event_t *se)
{
	group_agent_t ga = (group_agent_t)la;
	event_notification_t en;
	event_handle_t handle;
	int retval;
	
	assert(la != NULL);
	assert(group_agent_invariant(ga));
	assert(ga->ga_count > 0);
	assert(se != NULL);
	assert(se->length == 1);
	assert(se->flags & SEF_SINGLE_HANDLER);
	assert(se->notification != NULL);

	handle = la->la_handle;
	en = se->notification;

	/*
	 * The agents must be the same type, so we update the object type once
	 * for all the agents instead of individually.
	 */
	event_notification_clear_objtype(handle, en);
	event_notification_set_objtype(handle,
				       en,
				       ga->ga_agents[1]->objtype);
	event_notification_insert_hmac(handle, en);

	if (ga->ga_remaining != -1) {
		error("group %s is already active and waiting for COMPLETEs\n",
		      la->la_agent->name);
		errno = EBUSY;
		retval = -1;
	}
	else if (ga->ga_count == 1) {
		retval = 0;
	}
	else {
		se->agent.m = ga->ga_agents;
		se->length = ga->ga_count;
		
		retval = 0;
	}
	
	return retval;
}

static int group_agent_immediate(local_agent_t la, sched_event_t *se)
{
	group_agent_t ga = (group_agent_t)la;
	event_handle_t handle;
	int retval;

	assert(la != NULL);
	assert(group_agent_invariant(ga));
	assert(se != NULL);

	handle = la->la_handle;

	if (ga->ga_token != ~0) {
		warning("group already active\n");
		// XXX what to do? need to let STOPs go through...
	}
	else if (se->flags & SEF_SENDS_COMPLETE) {
		/*
		 * Sending this type of event will elicit a COMPLETE
		 * event from the agents, so we need to update the
		 * group object so it will wait for all the COMPLETEs
		 * before sending its own.
		 */
		event_notification_get_int32(handle,
					     se->notification,
					     "TOKEN",
					     &ga->ga_token);
		ga->ga_error = 0;
		ga->ga_remaining = ga->ga_count;
	}

279
280
281
	if (ga->ga_agents[1]->handler != NULL) {
		if (ga->ga_agents[1]->handler->la_flags & LAF_MULTIPLE) {
			local_agent_queue(ga->ga_agents[1]->handler, se);
282
283
284
285
286
287
288
289
290
		}
		else {
			int lpc;

			for (lpc = 1; lpc <= ga->ga_count; lpc++) {
				local_agent_queue(ga->ga_agents[lpc]->handler,
						  se);
			}
		}
Timothy Stack's avatar
 
Timothy Stack committed
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
		
		retval = 0;
	}
	else {
		event_notification_t en;
		int lpc;

		en = se->notification;
		for (lpc = 1; lpc <= ga->ga_count; lpc++) {
			event_notification_clear_objname(handle, en);
			event_notification_set_objname(
				handle, en, ga->ga_agents[lpc]->name);
			event_notification_insert_hmac(handle, en);

			event_notify(handle, en);
		}
		
		retval = 0;
	}
	
	return retval;
}