Main Page | Modules | Class List | Directories | File List | Class Members | File Members | Related Pages

flow_cache.c

Go to the documentation of this file.
00001 /**
00002  * @file   flow_cache.c
00003  * @author Chris Green <cmg@sourcefire.com>
00004  * @date   Fri Jun 20 09:04:51 2003
00005  * 
00006  * @brief  where flows are stored
00007  * 
00008  * The FlowCache is a memory-capped storage area for the FLOW
00009  * datatype.  It is inspired by spp_conversation, ipaudit, stream4 and
00010  * frag2. 
00011  *
00012  *
00013  * Each FLOW is uniquely identified by the 5-tuple (ipproto,sip,dip,sport,dport)
00014  *
00015  * Currently we only support IPV4 but new protocols will only require
00016  * the addition of additional protocol specific hash tables.  Ideally,
00017  * the API will stay the same and just do a switch on the key type to
00018  * support the various address families.
00019  *
00020  * This is meant to centralize the state management routines so that
00021  * it's easy to just worry about higher level protocols in other
00022  * modules.
00023  *
00024  * This is built on top of sfxhash currently and relies on it for
00025  * memory management. flow_hash.c contains the hashing keys 
00026  * 
00027  */
00028 #define FLOW_PERF_FIX
00029 
00030 #ifdef HAVE_CONFIG_H
00031 #include "config.h"
00032 #endif
00033 
00034 #include <stdio.h>
00035 
00036 #include "flow_cache.h"
00037 #include "flow_callback.h"
00038 #include "flow_hash.h"
00039 
00040 /* helper functions */
00041 static int flowcache_anrfree( void *key, void *data);
00042 static int flowcache_usrfree( void *key, void *data);
00043 
00044 /* minor optimization functions */
00045 static INLINE int flowcache_mru(FLOWCACHE *flowcachep, FLOW **flowpp);
00046 static INLINE int flowcache_lru(FLOWCACHE *flowcachep, FLOW **flowpp);
00047 
00048 /* statistics functions */
00049 static INLINE int FCS_find(FLOWCACHE *flowcachecp, FLOWKEY *keyp);
00050 static INLINE int FCS_revfind(FLOWCACHE *flowcachecp, FLOWKEY *keyp);
00051 static INLINE int FCS_new(FLOWCACHE *flowcachecp, FLOWKEY *keyp);
00052 static INLINE int FCS_find_success(FLOWCACHE *flowcachecp, FLOWKEY *keyp);
00053 static INLINE int FCS_find_fail(FLOWCACHE *flowcachecp, FLOWKEY *keyp);
00054 
00055 int flowcache_init(FLOWCACHE *flowcachep, unsigned int rows,
00056                    int memcap, int datasize, FLOWHASHID hashid)
00057 {
00058     int ret;
00059     int real_datasize = 0;
00060     
00061     if(!flowcachep)
00062     {
00063         return FLOW_ENULL;
00064     }
00065 
00066     if(datasize < 0)
00067     {
00068         return FLOW_ENULL;
00069     }
00070 
00071     if(memcap <= (datasize + sizeof(FLOW) + sizeof(SFXHASH_NODE)))
00072     {
00073         /* come on man, you gotta give me enough memory to store 1. */
00074         return FLOW_EINVALID;
00075     }
00076 
00077     if(rows < 1)
00078         return FLOW_EINVALID;
00079 
00080     /* zero out the struct for all the additional data strctures */
00081     memset(flowcachep, 0, sizeof(FLOWCACHE));
00082 
00083     /*
00084     **  If we have a datasize, then we need to decrement by 1 because
00085     **  the FLOWDATA already has one byte.
00086     */
00087     if(datasize)
00088     {
00089         real_datasize = datasize - 1;
00090     }
00091 
00092     /*
00093     **  datasize-1 because there is already 1 byte in the FLOWDATA
00094     **  structure.
00095     */
00096     flowcachep->ipv4_table =
00097         sfxhash_new(rows,                         /* # of nodes in HT*/
00098                     sizeof(FLOWKEY),              /* size of the key  */
00099                     sizeof(FLOW) + real_datasize, /* data size */
00100                     memcap,                       /* max memory */
00101                     1,                            /* auto recover nodes */
00102                     flowcache_anrfree,            /* autorecovery function */
00103                     flowcache_usrfree,            /* data free function*/
00104                     1);                           /* recycle old nodes */
00105 
00106     if(flowcachep->ipv4_table == NULL)
00107     {
00108         return FLOW_ENOMEM;
00109     }
00110 
00111     /* set our hash function to something that understands ipv4 flowkeys */
00112     switch(hashid)
00113     {
00114     case HASH1:
00115         ret = sfxhash_set_keyops(flowcachep->ipv4_table,
00116                                  flowkey_hashfcn1,
00117                                  flowkeycmp_fcn);
00118         break;
00119     case HASH2:
00120         ret = sfxhash_set_keyops(flowcachep->ipv4_table,
00121                                  flowkey_hashfcn2,
00122                                  flowkeycmp_fcn);
00123         break;
00124     default:
00125         ret = FLOW_EINVALID;
00126     }
00127 
00128     /* if setting the hash function or setting the comparison function fails,
00129        abort */
00130     if(ret != 0)
00131     {
00132         sfxhash_delete(flowcachep->ipv4_table);
00133         return FLOW_BADJUJU;
00134     }
00135 
00136     flowcachep->max_flowbits_bytes = (unsigned int)datasize;
00137 
00138     return FLOW_SUCCESS;
00139 }
00140 
00141 int flowcache_destroy(FLOWCACHE *flowcachep)
00142 {
00143     if(!flowcachep)
00144     {
00145         return FLOW_ENULL;
00146     }
00147 
00148     sfxhash_delete(flowcachep->ipv4_table);
00149 
00150     flowcachep->ipv4_table = NULL;
00151     
00152     return FLOW_SUCCESS;
00153 }
00154 
00155 unsigned flowcache_overhead_blocks(FLOWCACHE *fcp)
00156 {
00157     return sfxhash_overhead_blocks(fcp->ipv4_table);
00158 }
00159 
00160 
00161 int flowcache_releaseflow(FLOWCACHE *flowcachep, FLOW **flowpp)
00162 {
00163     FLOWKEY *key;
00164     FLOWKEY search_key;
00165     
00166     if(!flowcachep || !flowpp || !(*flowpp))
00167     {
00168         return FLOW_ENULL;
00169     }
00170 
00171     /** @todo remove any associated data with the flow */
00172 
00173     key = &(*flowpp)->key;
00174     
00175     flowkey_normalize(&search_key, key);
00176 
00177     if(sfxhash_remove(flowcachep->ipv4_table, &search_key) != 0)
00178     {
00179         return FLOW_NOTFOUND;
00180     }
00181 
00182     /* we've successfully removed the node from the table */
00183     
00184     *flowpp = NULL;
00185 
00186     return FLOW_SUCCESS;
00187 }
00188 
00189 int init_flowdata(FLOWCACHE *fcp, FLOW *flowp)
00190 {
00191     if(!flowp || !fcp)
00192         return 1;
00193 
00194     if(boInitStaticBITOP(&(flowp->data.boFlowbits), fcp->max_flowbits_bytes, 
00195                          flowp->data.flowb))
00196     {
00197         return 1;
00198     }
00199 
00200     return 0;
00201 }
00202 
00203 int flowcache_newflow(FLOWCACHE *flowcachep, FLOWKEY *keyp, FLOW **flowpp)
00204 {
00205     static int run_once = 1;
00206 #ifdef FLOW_PERF_FIX
00207     FLOW *newflow = NULL;
00208     SFXHASH_NODE *new_node = NULL;
00209 #else
00210     static FLOW zeroflow;
00211 #endif
00212     static FLOWKEY searchkey;
00213     int ret;
00214     
00215     if(!flowcachep || !keyp || !flowpp)
00216     {
00217         return FLOW_ENULL;
00218     }
00219 
00220     FCS_new(flowcachep, keyp);
00221     
00222     if(run_once)
00223     {
00224         /* all the time that we're running this, we're actually going
00225            to be filling in the key, and having zero'd out counters */ 
00226 #ifndef FLOW_PERF_FIX
00227         memset(&zeroflow, 0, sizeof(FLOW));
00228 #endif
00229         memset(&searchkey, 0, sizeof(FLOWKEY));        
00230         run_once = 0;
00231     }
00232 
00233     flowkey_normalize(&searchkey, keyp);
00234    
00235 #ifdef FLOW_PERF_FIX
00236     /* This just eliminates a memcpy. */
00237     /* Since we're using auto node recovery, we should get a node back
00238      * here that has a data pointer. */
00239     /* flow_init resets the internal key & stats to zero. */
00240     new_node = sfxhash_get_node(flowcachep->ipv4_table, &searchkey);
00241     if (new_node && new_node->data)
00242     {
00243         newflow = new_node->data;
00244     
00245         if(flow_init(newflow, keyp->protocol,
00246                      keyp->init_address, keyp->init_port,
00247                      keyp->resp_address, keyp->resp_port))
00248         {
00249             return FLOW_ENULL;
00250         }
00251         ret = SFXHASH_OK;
00252     }
00253     else
00254     {
00255         ret = SFXHASH_NOMEM;
00256     }
00257 #else
00258     if(flow_init(&zeroflow, keyp->protocol,
00259                  keyp->init_address, keyp->init_port,
00260                  keyp->resp_address, keyp->resp_port))
00261     {
00262         return FLOW_ENULL;
00263     }
00264 
00265     ret = sfxhash_add(flowcachep->ipv4_table, &searchkey, &zeroflow);
00266 #endif
00267 
00268     switch(ret)
00269     {
00270     case SFXHASH_OK:
00271         if(flowcache_mru(flowcachep,flowpp) != FLOW_SUCCESS)
00272         {
00273             /* something's wrong because we just added this thing!\n */
00274             flow_printf("Unable to find a key I just added!\n");
00275             return FLOW_BADJUJU;
00276         }
00277 
00278         if(init_flowdata(flowcachep, *flowpp))
00279         {
00280             return FLOW_BADJUJU;
00281         }
00282 
00283         return FLOW_SUCCESS;
00284         
00285     case SFXHASH_NOMEM:
00286         return FLOW_ENOMEM;
00287 
00288     case SFXHASH_INTABLE:
00289     default:
00290         return FLOW_EINVALID;
00291     }
00292 }
00293 
00294 /** 
00295  * Get the most recently used flow from the cache
00296  * 
00297  * @param flowcachep flow cache to operate on
00298  * @param flowp where to put the flow
00299  * 
00300  * @return FLOW_SUCCESS on sucess
00301  */
00302 static INLINE int flowcache_mru(FLOWCACHE *flowcachep, FLOW **flowpp)
00303 {
00304     if(!flowcachep  || !flowpp)
00305         return FLOW_EINVALID;
00306 
00307     *flowpp = sfxhash_mru(flowcachep->ipv4_table);
00308 
00309     if(*flowpp == NULL)
00310         return FLOW_NOTFOUND;
00311     
00312     return FLOW_SUCCESS;
00313 }
00314 
00315 /** 
00316  * Get the least recently used flow from the cache
00317  * 
00318  * @param flowcachep flow cache to operate on
00319  * @param flowp where to put the flow
00320  * 
00321  * @return FLOW_SUCCESS on sucess
00322  */
00323 static INLINE int flowcache_lru(FLOWCACHE *flowcachep, FLOW **flowpp)
00324 {
00325     if(!flowcachep  || !flowpp)
00326         return FLOW_EINVALID;
00327 
00328     *flowpp = sfxhash_lru(flowcachep->ipv4_table);
00329 
00330     if(*flowpp == NULL)
00331         return FLOW_NOTFOUND;
00332     
00333     return FLOW_SUCCESS;
00334 }
00335 
00336 /** 
00337  * Look for the data in the flow tables.
00338  * 
00339  * @param flowcachep cache to look in
00340  * @param keyp pointer to searching key data
00341  * @param flowpp pointer to set with this module
00342  * @param direction pass back argument (FROM_INITIATOR or FROM_RESPONDER)
00343  * 
00344  * @return FLOW_SUCCESS on success, FLOW_NOTFOUND when not found, else usage error
00345  */
00346 int flowcache_find(FLOWCACHE *flowcachep, FLOWKEY *keyp, FLOW **flowpp, int *direction)
00347 {
00348     FLOWKEY search_key;
00349     FLOW *fp;
00350 
00351     int way;
00352     
00353     if(!flowcachep || !keyp || !flowpp || !direction)
00354     {
00355         return FLOW_ENULL;
00356     }
00357     
00358     FCS_find(flowcachep, keyp);
00359 
00360     /* give us a single search key that we can hash on */
00361     flowkey_normalize(&search_key, keyp);
00362 
00363     fp = sfxhash_find(flowcachep->ipv4_table, &search_key);
00364     
00365     if(fp == NULL)
00366     {
00367         /* we have failed. Nothing else to do here */
00368         *flowpp = NULL;
00369 
00370         FCS_find_fail(flowcachep, keyp);
00371         
00372         return FLOW_NOTFOUND;
00373     }
00374     else 
00375     {
00376         /* now, lets see which way this flow was stored.  Note, this
00377            has nothing to do with the search key as that is only good
00378            for searching */
00379 
00380         if(fp->key.init_address == keyp->init_address &&
00381            fp->key.init_port == keyp->init_port)
00382         {
00383             way = FROM_INITIATOR;
00384         }
00385         else
00386         {
00387             way = FROM_RESPONDER;
00388             FCS_revfind(flowcachep, &search_key);
00389         }
00390     }
00391 
00392     *direction = way;
00393 
00394     *flowpp = fp;
00395 
00396     FCS_find_success(flowcachep, keyp);
00397     return FLOW_SUCCESS;
00398 }
00399 
00400 /** 
00401  * map a position to a name
00402  * 
00403  * @param position where to return the name of
00404  * 
00405  * @return string reprenting position name
00406  */
00407 const char *flowcache_pname(FLOW_POSITION position)
00408 {
00409     static const char *position_names[] = {"FLOW_NEW",
00410                                            "FLOW_FIRST_BIDIRECTIONAL",
00411                                            "FLOW_ADDITIONAL",
00412                                            "FLOW_SHUTDOWN",
00413                                            "FLOW_INVALID" };
00414 
00415     if(position < FLOW_NEW || position >= FLOW_MAX)
00416     {
00417         return position_names[5];
00418     }
00419 
00420     return position_names[position];
00421 }
00422 
00423 
00424 /** 
00425  * Automatically recover nodes and make sure that all the other
00426  * references are taken care of.
00427  * 
00428  * @param key hash key
00429  * @param data ptr to FLOW data
00430  * 
00431  * @return 0 if this node can be removed
00432  */
00433 static int flowcache_anrfree(void *key, void *data)
00434 {
00435     FLOW *fp;
00436 
00437     if(data)
00438     {
00439         fp = (FLOW *) data;
00440         flow_callbacks(FLOW_SHUTDOWN, fp, 0, NULL);
00441     }
00442     
00443     return 0;
00444 }
00445 
00446 
00447 /** 
00448  * Automatically recover nodes and make sure that all the other
00449  * references are taken care of.
00450  * 
00451  * @param key hash key
00452  * @param data ptr to FLOW data
00453  * p
00454  * @return 0 if this node can be removed
00455  */
00456 static int flowcache_usrfree(void *key, void *data)
00457 {
00458 #ifndef WIN32
00459     // printf("DEBUG: called %s\n", __func__);
00460 #else
00461     // printf("DEBUG: called file %s line %d\n", __FILE__, __LINE__);
00462 #endif
00463 
00464     return 0;
00465 }
00466 
00467 
00468 /* these are just helpers for the flowcache problems */
00469 
00470 
00471 static INLINE int FCS_find(FLOWCACHE *flowcachecp, FLOWKEY *keyp)
00472 {
00473     
00474     flowcachecp->total.find_ops++;
00475     flowcachecp->per_proto[keyp->protocol].find_ops++;
00476         
00477     return 0;
00478 }
00479 
00480 static INLINE int FCS_revfind(FLOWCACHE *flowcachecp, FLOWKEY *keyp)
00481 {
00482     flowcachecp->total.reversed_ops++;
00483     flowcachecp->per_proto[keyp->protocol].reversed_ops++;
00484     return 0;
00485 }
00486 
00487 static INLINE int FCS_new(FLOWCACHE *flowcachecp, FLOWKEY *keyp)
00488 {
00489     flowcachecp->total.new_flows++;
00490     flowcachecp->per_proto[keyp->protocol].new_flows++;
00491     return 0;
00492 }
00493 
00494 static INLINE int FCS_find_success(FLOWCACHE *flowcachecp, FLOWKEY *keyp)
00495 {
00496     flowcachecp->total.find_success++;
00497     flowcachecp->per_proto[keyp->protocol].find_success++;
00498     return 0;
00499 }
00500 
00501 static INLINE int FCS_find_fail(FLOWCACHE *flowcachecp, FLOWKEY *keyp)
00502 {
00503     flowcachecp->total.find_fail++;
00504     flowcachecp->per_proto[keyp->protocol].find_fail++;
00505     return 0;
00506 }
00507 
00508 void flowcache_stats(FILE *stream, FLOWCACHE *flowcachep)
00509 {
00510     int could_hold;
00511     int i;
00512     time_t low_time = 0, high_time = 0, diff_time = 0;    
00513     int diff_hours = 0, diff_min = 0, diff_sec = 0;
00514     int diff_blocks = 0;
00515     FLOW *flow_mrup, *flow_lrup;
00516 
00517 #ifdef INDEPTH_DEBUG
00518     printf("table max depth: %u\n",
00519            sfxhash_maxdepth(flowcachep->ipv4_table));
00520 #endif /* INDEPTH_DEBUG */
00521         
00522     if((flowcache_mru(flowcachep, &flow_mrup) == FLOW_SUCCESS) &&
00523        (flowcache_lru(flowcachep, &flow_lrup) == FLOW_SUCCESS))
00524     {
00525         low_time = flow_lrup->stats.last_packet;
00526         high_time = flow_mrup->stats.last_packet;
00527 
00528         diff_time = high_time - low_time;
00529 
00530         diff_hours = diff_time / 3600;
00531         diff_min = (diff_time - (3600 * diff_hours)) / 60;
00532         diff_sec = diff_time % 60;
00533     }
00534 
00535     diff_blocks = flowcachep->ipv4_table->mc.nblocks -
00536         flowcache_overhead_blocks(flowcachep);
00537 
00538 
00539     //could_hold = flowcachep->ipv4_table->mc.memcap /
00540     //    (sizeof(FLOW) + sizeof(FLOWKEY) + sizeof(SFXHASH_NODE));
00541 
00542     /* this is a bad calculation -- should clean this up */
00543     if(diff_blocks > 0)
00544     {
00545         could_hold = (flowcachep->ipv4_table->mc.memused - flowcache_overhead_bytes(flowcachep)) / diff_blocks;
00546         could_hold = flowcachep->ipv4_table->mc.memcap / could_hold;
00547     }
00548     else
00549     {
00550         could_hold = diff_blocks;
00551     }
00552 
00553     flow_printf(",----[ FLOWCACHE STATS ]----------\n");
00554     flow_printf("Memcap: %u Overhead Bytes %u used(%%%lf)/blocks (%u/%u)\nOverhead blocks: %u Could Hold: (%u)\n",
00555                 flowcachep->ipv4_table->mc.memcap,
00556                 flowcache_overhead_bytes(flowcachep),
00557                 calc_percent(flowcachep->ipv4_table->mc.memused,
00558                              flowcachep->ipv4_table->mc.memcap),
00559                 flowcachep->ipv4_table->mc.memused,
00560                 flowcachep->ipv4_table->mc.nblocks,
00561                 flowcache_overhead_blocks(flowcachep),
00562                 could_hold);
00563     
00564     flow_printf("IPV4 count: %u frees: %u\nlow_time: %u, high_time: %u,"
00565                 " diff: %dh:%02d:%02ds\n",
00566                 sfxhash_count(flowcachep->ipv4_table),
00567                 sfxhash_anr_count(flowcachep->ipv4_table),
00568                 (unsigned) low_time,
00569                 (unsigned) high_time,
00570                 diff_hours,diff_min,diff_sec);
00571     
00572 
00573     flow_printf("    finds: %llu reversed: %llu(%%%lf) \n    find_sucess: %llu "
00574                 "find_fail: %llu\npercent_success: (%%%lf) new_flows: %llu\n",
00575                 flowcachep->total.find_ops,
00576                 flowcachep->total.reversed_ops,
00577                 calc_percent(flowcachep->total.reversed_ops,
00578                              flowcachep->total.find_ops),
00579                 flowcachep->total.find_success,
00580                 flowcachep->total.find_fail,
00581                 calc_percent(flowcachep->total.find_success,
00582                              flowcachep->total.find_ops),
00583                 flowcachep->total.new_flows);
00584     
00585     for(i=0;i<256;i++)
00586     {
00587         if(flowcachep->per_proto[i].find_ops > 0)
00588         {
00589             flow_printf(" Protocol: %d (%%%lf)\n"
00590                         "   finds: %llu\n"
00591                         "   reversed: %llu(%%%lf)\n"
00592                         "   find_sucess: %llu\n"
00593                         "   find_fail: %llu\n"
00594                         "   percent_success: (%%%lf)\n"
00595                         "   new_flows: %llu\n",
00596                         i,
00597                         calc_percent(flowcachep->per_proto[i].find_ops,
00598                                      flowcachep->total.find_ops),
00599                         flowcachep->per_proto[i].find_ops,
00600                         flowcachep->per_proto[i].reversed_ops,
00601                         calc_percent(flowcachep->per_proto[i].reversed_ops,
00602                                      flowcachep->per_proto[i].find_ops),
00603                         flowcachep->per_proto[i].find_success,
00604                         flowcachep->per_proto[i].find_fail,
00605                         calc_percent(flowcachep->per_proto[i].find_success,
00606                                      flowcachep->per_proto[i].find_ops),
00607                         flowcachep->per_proto[i].new_flows);
00608         }
00609     }
00610 }
00611 
00612 /** 
00613  * get the row count
00614  * 
00615  * @param sbp flowcache ptr to return the memcap of
00616  * 
00617  * @return nrows or -1
00618  */
00619 int flowcache_row_count(FLOWCACHE *sbp)
00620 {
00621     if(sbp != NULL && sbp->ipv4_table != NULL)        
00622         return sbp->ipv4_table->nrows;
00623 
00624     return -1;            
00625 }
00626 
00627 /** 
00628  * get the overhead # of bytes
00629  * 
00630  * @param sbp flowcache ptr to return the memcap of
00631  * 
00632  * @return nrows or -1
00633  */
00634 
00635 int flowcache_overhead_bytes(FLOWCACHE *sbp)
00636 {
00637     if(sbp != NULL && sbp->ipv4_table != NULL)
00638         return sfxhash_overhead_bytes(sbp->ipv4_table);
00639 
00640     return -1;            
00641 
00642 }

Generated on Sun May 14 14:51:15 2006 by  doxygen 1.4.2