Tutorial 5: Request Resource Allocation in a Cluster and Start Containers Based on Host Loading
This tutorial describes how to create a registered EGO client that requests resource allocation in a cluster and starts containers on the hosts. Based on host loading, the program either adds more resources or releases some.
Using this tutorial, you will ...
- Open a connection to Platform EGO
- Print out cluster information
- Check if there are any registered clients connected to Platform EGO
- Log on to Platform EGO
- Register the client with Platform EGO
- Print out allocation and container reply information from a previous connection
- Print out host group information
- Determine the number of available host slots and make a resource allocation request for half of them
- Store allocation requests in a resource queue and make allocation requests to Platform EGO
- Retrieve the allocation reply from the work queue and start a container on each host slot
- Add or release a resource depending on host loading
- Check for registered clients connected to Platform EGO and print out info
- Unregister the client
Underlying principles
Refer to Tutorial 4: Underlying principles for a description of the multi-thread technique used in this sample.
Step 1: Preprocessor directives
The first step is to include a reference to the system and API header files. The samples.h header file contains the method declarations that are common to all of the samples.
#include <stdlib.h> #include <stdio.h> #include <unistd.h> #include <string.h> #include <pthread.h> #include <errno.h> #include <sys/time.h> #include "vem.api.h" #include "samples.h"Step 2: Implement the principal method
Lines 4-7: define and initialize a data structure that is used to request a connection with the EGO host cluster. The data structure contains a reference to a configuration file where the master host name and port numbers are stored.
Line 8: pass the data structure as an argument to the vem_open () method, which opens a connection to the master host. If the connection attempt is successful, a handle is returned; otherwise the method returns NULL. The handle acts as a communication channel to the master host and all subsequent communication occurs through this handle.
Lines 16-17: the vem_name_t structure (defined as clusterName) is initialized with NULL. This structure holds the cluster name, system name, and version. The vem_uname () method is passed the communication handle and, if successful, returns a valid vem_name_t structure ; otherwise the method returns NULL
Line 24: the cluster info is printed out to the screen.
Lines 27-43: locate all the registered clients and print out the client info (name, description, and location). Define the client info structure. Use vem_locate() to get all registered clients. Since NULL is provided as the client name, all registered clients will be located and the method returns the number of registered clients. Note that Platform EGO is equipped with a number of default clients (services) such as the Service Controller, so as a minimum, the info relevant to these clients is printed out and the associated memory is released.
1 int 2 sample5() 3 { 4 vem_openreq_t orequest; 5 vem_handle_t *vhandle = NULL; 6 orequest.file = "ego.conf"; 7 orequest.flags=0; 8 vhandle = vem_open(&orequest); 9 10 if (vhandle == NULL) { 11 // error opening 12 fprintf(stderr, "Error opening cluster: %s\n", vem_strerror(vemerrno)); 13 return -1; 14 } 15 16 vem_name_t *clusterName = NULL; 17 clusterName = vem_uname(vhandle); 18 if (clusterName == NULL) { 19 // error connecting 20 fprintf(stderr, "Error connecting to cluster: %s\n", 21 vem_strerror(vemerrno)); 22 return -2; 23 } 24 fprintf(stdout, " Connected... %s %s %4.2f\n", clusterName->clustername, 25 clusterName->sysname, clusterName->version); 26 vem_clientinfo_t *clients; 27 int rc = vem_locate(vhandle, NULL, &clients); 28 if (rc >=0) { 29 if (rc == 0) { 30 printf("No registered clients exist\n"); 31 } else { 32 int i=0; 33 for (i=0; i<rc; i++) { 34 printf("%s %s %s\n", clients[i].name, clients[i].description, 35 clients[i].location); 36 } 37 // free 38 vem_clear_clientinfo(clients); 39 } 40 } else { 41 // error connecting 42 fprintf(stderr, "Error geting clients: %s\n", vem_strerror(vemerrno)); 43 }Lines 44-47: authenticate the user to Platform EGO.
Lines 48-52: define and initialize a structure for callback methods. These callback methods are invoked by Platform EGO when resources are added or reclaimed, or when a change occurs to host status or a container. When Platform EGO wants to communicate about these events, it invokes these methods thereby calling back to the client.
Lines 53-67: define the vem_allocation_info_reply_t and vem_container_info_reply_t structures. If a client gets disconnected and then re-registers, its existing allocations and containers are returned to these structures. If the client had never registered before, the structures would be empty. Define and initialize a structure (rreq) that holds client info for registration purposes. (This includes assigning the client callback structure (cbf) to the callback member of the rreq structure.) Register with Platform EGO via the open connection using vem_register().
Lines 68-71: print out information related to the allocation requests and containers. Once the info is printed out, the memory for the allocations is freed.
Lines 75-82: the vem_gethostgroupinfo() method collects the information for the requested hostgroup. In this case, the requested hostgroup in the input argument is set to NULL, which means that information about all hostgroups is requested. If the method call is successful, hostgroup information is printed out to the screen.
44 if (login(vhandle, username, password)<0) { 45 fprintf(stderr, "Error logon: %s\n", vem_strerror(vemerrno)); 46 goto leave; 47 } 48 vem_clientcallback_t cbf; 49 cbf.addResource = addResourceCB; 50 cbf.reclaimForce = reclaimForceCB; 51 cbf.containerStateChg = containerStateChgCB; 52 cbf.hostStateChange = hostStateChangeCB; 53 vem_allocation_info_reply_t aireply; 54 vem_container_info_reply_t cireply; 55 vem_registerreq_t rreq; 56 57 rreq.name = "sample5_client"; 58 rreq.description = "Sample5 Client"; 59 rreq.flags = VEM_REGISTER_TTL; 60 rreq.ttl = 3; 61 rreq.cb = &cbf; 62 63 rc = vem_register(vhandle, &rreq, &aireply, &cireply); 64 if (rc < 0) { 65 fprintf(stderr, "Error registering: %s\n", vem_strerror(vemerrno)); 66 goto leave; 67 } 68 print_vem_allocation_info_reply(&aireply); 69 print_vem_container_info_reply(&cireply); 70 // freeup any previous allocations 71 release_vem_allocation(vhandle, &aireply); 72 vem_hostgroupreq_t hgroupreq; 73 hgroupreq.grouplist = NULL; 74 vem_hostgroup_t *hgroup; 75 rc = vem_gethostgroupinfo(vhandle, &hgroupreq, &hgroup); 76 if (rc < 0) { 77 fprintf(stderr, "Error getting hostgroup: %s\n", 78 vem_strerror(vemerrno)); 79 } else { 80 printf("%s %s %d %d\n", hgroup->groupName, hgroup->members, hgroup->free, 81 hgroup->allocated); 82 }Lines 83-95: define and initialize structures for the workload, resources and monitor threads. These structures are global in scope.
Lines 97-107: create and run the three threads. Refer to Tutorial 4: Underlying principles for further details of the workload and resource threads.
Lines 109-117: get half the number of available host slots and make a corresponding number of resource allocation requests via the add_resources() method. This method adds a new allocation request to the resource queue and increments the queue index (next_item). The add_resources() method also sets the condition variable, which tells the waiting resource_thread that a new allocation request has been added to the resource queue. The resource_thread resumes execution and the resource_mutex object is unlocked.
83 pthread_t worker_thread, resource_thread, monitor_thread; 84 work_state_t workload; 85 resource_state_t resources; 86 monitor_state_t monitor; 87 88 // globals so that callback functions can find the queues/lock/cond var 89 workloadP = &workload; 90 resourcesP = &resources; 91 monitorP = &monitor; 92 93 initialize_workload(&workload, vhandle); 94 initialize_resources(&resources, vhandle); 95 initialize_monitor(&monitor, vhandle); 96 97 if (pthread_create(&worker_thread, NULL, work_thread_fn, &workload)) { 98 perror("Error creating worker thread: "); 99 } 100 if (pthread_create(&resource_thread, NULL, resource_thread_fn, 101 &resources)) { 102 perror("Error creating resource thread: "); 103 } 104 if (pthread_create(&monitor_thread, NULL, monitor_thread_extended_fn, 105 &monitor)) { 106 perror("Error creating monitor thread: "); 107 } 108 // Request half of them, one if just one is available 109 int numavailable = getNumberOfHostSlotsAvailable(vhandle); 110 fprintf(stderr, "Available Slots=%d\n", numavailable); 111 if(numavailable > 0) { 112 int num_request = 4; //(numavailable / 2) > 1 ? (numavailable / 2): 1; //3; 113 vem_allocreq_t *aloc_spec = get_alloc_spec(); 114 // aloc_spec->maxslots = 1; 115 // add to request Q 116 add_resources(num_request, aloc_spec); 117 }When a resource is added, the addResourceCB() callback method is executed. The callback method adds the allocation reply structure to the workload queue at position next_item and increments the index (next_item). The condition variable is set, which tells the waiting work_thread that a new allocation reply has been added to the workload queue. The work_thread resumes execution and the work_mutex object is unlocked. The allocation reply is also printed out.
Lines 118-119: pause the main thread for 180 milliseconds. The finalize() method sets the shutdown flag to 1 and sets the condition variable for all three threads. The shutdown flag causes the three threads to end execution.
Lines 122-128: block the main thread until all three threads have finished. Clean up the thread states by destroying the mutex object and condition variable associated with each thread.
Lines 130-145: use vem_locate() to get all registered clients. Since NULL is provided as the client name, all registered clients will be located and the method returns the number of registered clients. If successful, print out the client info and free the associated memory.
118 sleep(180); 119 finalize(); 120 // wait for worker, resource, monitor threads to be finish 121 122 pthread_join(worker_thread, NULL); 123 pthread_join(resource_thread, NULL); 124 pthread_join(monitor_thread, NULL); 125 // clean up thread states 126 finalize_workload(workloadP); 127 finalize_resources(resourcesP); 128 finalize_monitor(monitorP); 129 130 rc = vem_locate(vhandle, NULL, &clients); 131 if (rc >=0) { 132 if (rc == 0) { 133 printf("No registered clients exist\n"); 134 } else { 135 int i=0; 136 for (i=0; i<rc; i++) { 137 printf("%s %s %s\n", clients[i].name, clients[i].description, 138 clients[i].location); 139 } 140 vem_clear_clientinfo(clients); 141 } 142 } else { 143 // error connecting 144 fprintf(stderr, "Error geting clients: %s\n", vem_strerror(vemerrno)); 145 } 146 bailout: 147 rc = vem_unregister(vhandle); 148 if (rc < 0) { 149 fprintf(stderr, "Error unregistering: %s\n", vem_strerror(vemerrno)); 150 } 151 if (logout(vhandle)<0) { 152 fprintf(stderr, "Error logoff: %s\n", vem_strerror(vemerrno)); 153 } 154 leave: 155 // free memory 156 vem_free_uname(clusterName); 157 vem_close(vhandle); 158 return 0; 159 }Step 3: Add or release resources based on average host load (monitor thread)
Lock the monitor_mutex and get the current time. The thread now waits for either a host state change to be signalled by Platform EGO or the wait time to expire. If a state change occurs, the corresponding callback method (hostStateChangeCB) is invoked by Platform EGO, which updates the host state in the resource collection. The condition variable is then set to reactivate the monitor thread. If the thread resumes execution due to wait time expiration, the average computer load is calculated and printed out; refer to Tutorial 4: Step 7: Calculate the average activity load on the resources
If the average host load is greater than a predetermined threshold, an additional resource is allocated via the add_resources() method; refer to Tutorial 4: Step 6: Client callback methods for further details of the add_resources() method. If the average computer load is less than a predetermined threshold and the number of allocated hosts is greater than 0, release a resource via the release_resources() method; refer to the next step.
void *monitor_thread_extended_fn(monitor_state_t *monitorP) { struct timespec timeout; struct timeval now; int rc; while(!samples_shutdown) { // wait until change in host/container status is received pthread_mutex_lock(&monitorP->monitor_mutex); monitorP->ready = 1; gettimeofday(&now, NULL); timeout.tv_sec = now.tv_sec + 30; timeout.tv_nsec = now.tv_usec *1000; rc = pthread_cond_timedwait(&monitorP->monitor_cond, &monitorP->monitor_mutex, &timeout); // Currently no way to get container from id. //print_vem_container(vem_container_t *container); if (rc == ETIMEDOUT) { char **attrs = NULL; int numh = -1; vem_hostinfo_t *hinfo = NULL; double *loads = NULL;
double load = computeAverageLoad(monitorP, &numh, &hinfo, &attrs, &loads); fprintf(stderr, "\n Monitor: Avg. Load =%6.2f\n", load); // update resources based on load int delta = 1; vem_allocreq_t *aloc_spec = get_alloc_spec(); if(load > UPPER_THRESHOLD) { add_resources(delta, aloc_spec); } else if (load < LOWER_THRESHOLD && numh > 0) { release_resources(monitorP->vhandle, delta, aloc_spec, numh, hinfo, attrs, loads); } if(hinfo != NULL) { vem_free_hostinfo(hinfo, numh, attrs); } if(loads != NULL) { free(loads); } } else { fprintf(stderr, "Received Event?\n"); } pthread_mutex_unlock(&monitorP->monitor_mutex); } fprintf(stderr, "MonitorThread Shutdown\n"); return NULL; }Step 4: Release resources from Platform EGO
The release_resources() method determines the least loaded host and makes a request to Platform EGO to release this allocated resource.
Define an array of structures to hold the individual host loads and associated index. Use the qsort() method to sort the host loads in ascending order.
For each resource to be released and starting with the least loaded host, get the host name and number of slots.
Get the index of the least loaded host in the resource collection. The find_host_index() method looks for the matching host name in the resource collection, and if allocated, returns the index.
Define the release request structure with information such as the allocation ID of the host to release, as well as number of hosts/slots to release. Pass this structure to the vem_release() method, which causes Platform EGO to release the resource(s). The remove_monitor_resource() method searches in the resource collection for the name of the host to be released. When a match is found, the host's allocation state is updated to released status, i.e., unallocated.
int release_resources(vem_handle_t* vhandle, int delta, vem_allocreq_t *alocreq, int num_hosts, vem_hostinfo_t *hinfo, char **attrs, double *loads) { // which ones to release? Sort the load and release the ones that are least used // create a struct of load and index to sort int e, index, hindex, i; sort_element_t *element_array = calloc(num_hosts, sizeof(sort_element_t)); for(e=0; e<num_hosts; e++) { element_array[e].load = loads[e]; element_array[e].index = e; } qsort(element_array, num_hosts, sizeof(sort_element_t), sort_fn); vem_host_t host; vem_releasereq_t release; resource_collection_t *resources = monitorP->resources; for(i=0; i<delta; i++) { index = element_array[i].index; host.name = hinfo[index].name; host.slots = 1; // should be made generic hindex = find_host_index(resources, hinfo[index].name); release.allocId = resources->allocId[hindex]; release.nhosts = 1; release.hosts = &host; release.flags = VEM_RELEASE_AUTOADJ; int rc = vem_release(vhandle, &release); if (rc < 0) { fprintf(stderr, "Error releasing resource: %s\n", vem_strerror(vemerrno)); } //mark them as released in the monitor host list if(rc == 0) { remove_monitor_resource(monitorP, host.name); } } free(element_array); return 0; }
int sort_fn(const void *e1, const void *e2) { sort_element_t *d1 = (sort_element_t *)e1; sort_element_t *d2 = (sort_element_t *)e2; return (d1->load == d2->load) ? 0 : ( (d1->load < d2->load) ? -1 : +1); }
int find_host_index(resource_collection_t *resources, char *hname) { int i, index=-1; if (hname == NULL) return index; for(i=0; i<resources->num; i++) { if(!strcmp(hname, resources->names[i]) && (resources->allocstate[i] == RESOURCE_ALLOCATED)) { return i; } } return index; }
void remove_monitor_resource(monitor_state_t *monitorP, char *hostname) { fprintf(stderr, "remove host\n"); // assumes the caller acquired the lock int i; resource_collection_t *resources = monitorP->resources; if(hostname == NULL) { return; } for(i=0; i< resources->num; i++){ if(!strcmp(hostname, resources->names[i]) && (resources->allocstate[i] != RESOURCE_RELEASED)) { resources->allocstate[i] = RESOURCE_RELEASED; } } }Run the client application
- Select Run > Run.
The Run dialog appears.
- In the Configurations list, either select an EGO C Client Application or click New for a new configuration.
For a new configuration, enter the configuration name.
- Enter the project name and C/C++ Application name.
- Click Apply and then Run.
Sample Output
![]()
![]()
![]()
![]()
![]()
[ Top ]
[ Platform Documentation ]
Date Modified: July 12, 2006
Platform Computing: www.platform.com
Platform Support: support@platform.com
Platform Information Development: doc@platform.com
Copyright © 1994-2006 Platform Computing Corporation. All rights reserved.