Main Page | Modules | Class List | Directories | File List | Class Members | File Members | Related Pages

buffering.h

00001 typedef struct dblib_buffer_row {
00003         TDSRESULTINFO *resinfo;
00005         unsigned char *row_data;
00007         DBINT row;
00009         TDS_INT *sizes;
00010 } DBLIB_BUFFER_ROW;
00011 
00012 static void buffer_struct_print(const DBPROC_ROWBUF *buf);
00013 static int buffer_save_row(DBPROCESS *dbproc);
00014 static DBLIB_BUFFER_ROW* buffer_row_address(const DBPROC_ROWBUF * buf, int idx);
00015 
00045 static int
00046 buffer_count(const DBPROC_ROWBUF *buf)
00047 {
00048         return (buf->head > buf->tail) ?
00049                 buf->head - buf->tail :                         /* |...TddddH....| */
00050                 buf->capacity - (buf->tail - buf->head);        /* |ddddH....Tddd| */
00051 }
00052  
00056 static int
00057 buffer_is_full(const DBPROC_ROWBUF *buf)
00058 {
00059         return buf->capacity == buffer_count(buf) && buf->capacity > 1;
00060 }
00061 
00062 static int
00063 buffer_index_valid(const DBPROC_ROWBUF *buf, int idx)
00064 {
00065         if (buf->tail <= buf->head)
00066                 if (buf->head <= idx && idx <= buf->tail)
00067                         return 1;
00068         
00069         if (0 <= idx && idx <= buf->head)
00070                 return 1;
00071         
00072         if (buf->tail <= idx && idx < buf->capacity)
00073                 return 1;
00074 #if 0   
00075         printf("buffer_index_valid: idx = %d\n", idx);
00076         buffer_struct_print(buf);
00077 #endif
00078         return 0;       
00079 }
00080 
00081 static void
00082 buffer_free_row(DBLIB_BUFFER_ROW *row)
00083 {
00084         if (row->sizes)
00085                 TDS_ZERO_FREE(row->sizes);
00086         if (row->row_data) {
00087                 tds_free_row(row->resinfo, row->row_data);
00088                 row->row_data = NULL;
00089         }
00090         tds_free_results(row->resinfo);
00091         row->resinfo = NULL;
00092 }
00093  
00094 /*
00095  * Buffer is freed at slightly odd points, whenever
00096  * capacity changes: 
00097  * 
00098  * 1. When setting capacity, to release prior buffer.  
00099  * 2. By dbresults.  When called the second time, it has to 
00100  * release prior storage because the new resultset will have
00101  * a different width.  
00102  * 3. By dbclose(), else open/close/open would leak.  
00103  */
00104 static void
00105 buffer_free(DBPROC_ROWBUF *buf)
00106 {
00107         if (buf->rows != NULL) {
00108                 int i;
00109                 for (i = 0; i < buf->capacity; ++i)
00110                         buffer_free_row(&buf->rows[i]);
00111                 TDS_ZERO_FREE(buf->rows);
00112         }
00113 }
00114 
00115 /*
00116  * When no rows are currently buffered (and the buffer is allocated)
00117  * set the indices to their initial postions.
00118  */
00119 static void
00120 buffer_reset(DBPROC_ROWBUF *buf)
00121 {
00122         buf->head = 0;
00123         buf->current = buf->tail = buf->capacity;
00124 }
00125 
00126 static int
00127 buffer_idx_increment(const DBPROC_ROWBUF *buf, int idx)
00128 {
00129         if (++idx >= buf->capacity) { 
00130                 idx = 0;
00131         }
00132         return idx;
00133 }
00134 
00139 static DBLIB_BUFFER_ROW*
00140 buffer_row_address(const DBPROC_ROWBUF * buf, int idx)
00141 {
00142         if (!(idx >= 0 && idx < buf->capacity)) {
00143                 printf("idx is %d:\n", idx);
00144                 buffer_struct_print(buf);
00145                 assert(idx >= 0);
00146                 assert(idx < buf->capacity);
00147         }
00148         
00149         return &(buf->rows[idx]);
00150 }
00151 
00155 static DBINT
00156 buffer_idx2row(const DBPROC_ROWBUF *buf, int idx)
00157 {
00158         return buffer_row_address(buf, idx)->row;
00159 }
00160 
00164 static int
00165 buffer_row2idx(const DBPROC_ROWBUF *buf, int row_number)
00166 {
00167         int i, ii, idx = -1;
00168         
00169         if (buf->tail == buf->capacity) {
00170                 assert (buf->head == 0);
00171                 return -1;      /* no rows buffered */
00172         }
00173         
00174         /* 
00175          * March through the buffers from tail to head, stop if we find our row.  
00176          * A full queue is indicated by tail == head (which means we can't write).
00177          */
00178         for (ii=0, i = buf->tail; i != buf->head || ii == 0; i = buffer_idx_increment(buf, i)) {
00179                 if( buffer_idx2row(buf, i) == row_number) {
00180                         idx = i;
00181                         break;
00182                 }
00183                 assert(ii++ < buf->capacity); /* prevent infinite loop */
00184         } 
00185         
00186         return idx;
00187 }
00188 
00194 static void
00195 buffer_delete_rows(DBPROC_ROWBUF * buf, int count)
00196 {
00197         int i;
00198 
00199         if (count < 0 || count > buffer_count(buf)) {
00200                 count = buffer_count(buf);
00201         }
00202 
00203         for (i=0; i < count; i++) {
00204                 if (buf->tail < buf->capacity)
00205                         buffer_free_row(&buf->rows[i]);
00206                 buf->tail = buffer_idx_increment(buf, buf->tail);
00207                 /* 
00208                  * If deleting rows from the buffer catches the tail to the head, 
00209                  * return to the initial postion.  Otherwise, it will look full.
00210                  */
00211                 if (buf->tail == buf->head) {
00212                         buffer_reset(buf);
00213                         break;
00214                 }
00215         }
00216 #if 0
00217         buffer_struct_print(buf);
00218 #endif
00219 }
00220 
00221 static void
00222 buffer_transfer_bound_data(DBPROC_ROWBUF *buf, TDS_INT res_type, TDS_INT compute_id, DBPROCESS * dbproc, int idx)
00223 {
00224 
00225         int i;
00226         TDSCOLUMN *curcol;
00227         TDSRESULTINFO *resinfo;
00228         int srctype;
00229         BYTE *src;
00230         int desttype;
00231         const DBLIB_BUFFER_ROW *row;
00232 
00233         assert(buffer_index_valid(buf, idx));
00234 
00235         row = buffer_row_address(buf, idx);
00236         assert(row->resinfo);
00237         resinfo = row->resinfo;
00238 
00239         for (i = 0; i < resinfo->num_cols; i++) {
00240                 DBINT srclen;
00241 
00242                 curcol = resinfo->columns[i];
00243                 if (row->sizes)
00244                         curcol->column_cur_size = row->sizes[i];
00245 
00246                 if (curcol->column_nullbind) {
00247                         if (curcol->column_cur_size < 0) {
00248                                 *(DBINT *)(curcol->column_nullbind) = -1;
00249                         } else {
00250                                 *(DBINT *)(curcol->column_nullbind) = 0;
00251                         }
00252                 }
00253                 if (!curcol->column_varaddr)
00254                         continue;
00255 
00256                 src = (row->row_data ? row->row_data : row->resinfo->current_row) + curcol->column_offset;
00257                 srclen = curcol->column_cur_size;
00258                 if (is_blob_type(curcol->column_type)) {
00259                         src = (BYTE *) ((TDSBLOB *) src)->textvalue;
00260                 }
00261                 desttype = _db_get_server_type(curcol->column_bindtype);
00262                 srctype = tds_get_conversion_type(curcol->column_type, curcol->column_size);
00263 
00264                 if (srclen < 0) {
00265                         _set_null_value((BYTE *) curcol->column_varaddr, desttype, curcol->column_bindlen);
00266                 } else {
00267                         copy_data_to_host_var(dbproc, srctype, src, srclen, desttype, 
00268                                                 (BYTE *) curcol->column_varaddr,  curcol->column_bindlen,
00269                                                          curcol->column_bindtype, curcol->column_nullbind);
00270                 }
00271         }
00272 
00273         /*
00274          * This function always bumps current.  Usually, it's called 
00275          * by dbnextrow(), so bumping current is a pretty obvious choice.  
00276          * It can also be called by dbgetrow(), but that function also 
00277          * causes the bump.  If you call dbgetrow() for row N, a subsequent
00278          * call to dbnextrow() yields N+1.  
00279          */
00280         buf->current = buffer_idx_increment(buf, buf->current);
00281 
00282 }       /* end buffer_transfer_bound_data()  */
00283 
00284 static void 
00285 buffer_struct_print(const DBPROC_ROWBUF *buf)
00286 {
00287         assert(buf);
00288 
00289         printf("\t%d rows in buffer\n",         buffer_count(buf));
00290         
00291         printf("\thead = %d\t",                 buf->head);
00292         printf("\ttail = %d\t",                 buf->tail);
00293         printf("\tcurrent = %d\n",              buf->current);
00294         printf("\tcapacity = %d\t",             buf->capacity);
00295         printf("\thead row number = %d\n",      buf->received);
00296 }
00297 
00298 /* * * Functions called only by public db-lib API take DBPROCESS* * */
00299 
00316 static int
00317 buffer_current_index(const DBPROCESS *dbproc)
00318 {
00319         const DBPROC_ROWBUF *buf = &dbproc->row_buf;
00320 #if 0
00321         buffer_struct_print(buf);
00322 #endif
00323         if (buf->capacity <= 1) /* no buffering */
00324                 return -1;
00325         if (buf->current == buf->head || buf->current == buf->capacity)
00326                 return -1;
00327                 
00328         assert(buf->current >= 0);
00329         assert(buf->current < buf->capacity);
00330         
00331         if( buf->tail < buf->head) {
00332                 assert(buf->tail < buf->current);
00333                 assert(buf->current < buf->head);
00334         } else {
00335                 if (buf->current > buf->head)
00336                         assert(buf->current > buf->tail);
00337         }
00338         return buf->current;
00339 }
00340 
00341 /*
00342  * Normally called by dbsetopt() to prepare for buffering
00343  * Called with nrows == 0 by dbopen to safely set buf->rows to NULL.  
00344  */
00345 static void
00346 buffer_set_capacity(DBPROCESS *dbproc, int nrows)
00347 {
00348         DBPROC_ROWBUF *buf = &dbproc->row_buf;
00349         
00350         buffer_free(buf);
00351 
00352         memset(buf, 0, sizeof(DBPROC_ROWBUF));
00353 
00354         if (0 == nrows) {
00355                 buf->capacity = 1;
00356                 return;
00357         }
00358 
00359         assert(0 < nrows);
00360 
00361         buf->capacity = nrows;
00362 }
00363 
00364 /*
00365  * Called only by dbresults(); capacity must be >= 1. 
00366  * Sybase's documents say dbresults() cannot return FAIL if the prior calls worked, 
00367  * which is a little strange, because (for FreeTDS, at least), dbresults
00368  * is when we learn about the result set's width.  Without that information, we
00369  * can't allocate memory for the buffer.  But if we *fail* to allocate memory, 
00370  * we're not to communicate it back to the caller?   
00371  */
00372 static void
00373 buffer_alloc(DBPROCESS *dbproc)
00374 {
00375         DBPROC_ROWBUF *buf = &dbproc->row_buf;
00376         
00377         /* Call this function only after setting capacity. */
00378 
00379         assert(buf);
00380         assert(buf->capacity > 0);
00381         assert(buf->rows == NULL);
00382         
00383         buf->rows = (DBLIB_BUFFER_ROW *) calloc(buf->capacity, sizeof(DBLIB_BUFFER_ROW));
00384         
00385         assert(buf->rows);
00386         
00387         buffer_reset(buf);
00388         
00389         buf->received = 0;
00390 }
00391 
00396 static int
00397 buffer_add_row(DBPROCESS *dbproc, TDSRESULTINFO *resinfo)
00398 {
00399         DBPROC_ROWBUF *buf = &dbproc->row_buf;
00400         DBLIB_BUFFER_ROW *row;
00401         int i;
00402 
00403         assert(buf->capacity >= 0);
00404 
00405         if (buffer_is_full(buf))
00406                 return -1;
00407 
00408         /* initial condition is head == 0 and tail == capacity */
00409         if (buf->tail == buf->capacity) {
00410                 /* bumping this tail will set it to zero */
00411                 assert(buf->head == 0);
00412                 buf->tail = buffer_idx_increment(buf, buf->tail);
00413         }
00414 
00415         row = buffer_row_address(buf, buf->head);
00416 
00417         /* bump the row number, write it, and move the data to head */
00418         if (row->resinfo) {
00419                 tds_free_row(row->resinfo, row->row_data);
00420                 tds_free_results(row->resinfo);
00421         }
00422         row->row = ++buf->received;
00423         ++resinfo->ref_count;
00424         row->resinfo = resinfo;
00425         row->row_data = NULL;
00426         if (row->sizes)
00427                 free(row->sizes);
00428         row->sizes = (TDS_INT *) calloc(resinfo->num_cols, sizeof(TDS_INT));
00429         for (i = 0; i < resinfo->num_cols; ++i)
00430                 row->sizes[i] = resinfo->columns[i]->column_cur_size;
00431 
00432         /* update current, bump the head */
00433         buf->current = buf->head;
00434         buf->head = buffer_idx_increment(buf, buf->head);
00435 
00436         return buf->current;
00437 }
00438 
00439 static int
00440 buffer_save_row(DBPROCESS *dbproc)
00441 {
00442         DBPROC_ROWBUF *buf = &dbproc->row_buf;
00443         DBLIB_BUFFER_ROW *row;
00444         int idx = buf->head - 1;
00445 
00446         if (buf->capacity <= 1)
00447                 return SUCCEED;
00448 
00449         if (idx < 0)
00450                 idx = buf->capacity - 1;
00451         if (idx >= 0 && idx < buf->capacity) {
00452                 row = &buf->rows[idx];
00453 
00454                 if (row->resinfo && !row->row_data) {
00455                         row->row_data = row->resinfo->current_row;
00456                         row->resinfo->current_row = tds_alloc_row(row->resinfo);
00457                 }
00458         }
00459 
00460         return SUCCEED;
00461 }
00462 

Generated on Sat Jul 1 11:11:45 2006 for FreeTDS API by  doxygen 1.4.1