28 #define RCDAQEVENTITERATOR 1
29 #define FILEEVENTITERATOR 2
30 #define TESTEVENTITERATOR 3
34 #define LISTEVENTITERATOR 7
39 #if defined(SunOS) || defined(Linux) || defined(OSF1)
49 COUT <<
"** usage: dpipe -s -d -v -w -n -i -z -l -x source destination" << std::endl;
50 COUT <<
" dpipe -h for more help" << std::endl;
56 COUT <<
"** cannot specify both -e and -c!" << std::endl;
57 COUT <<
" type dpipe -h for more help" << std::endl;
63 COUT <<
"** cannot specify both -z and -l!" << std::endl;
64 COUT <<
" type dpipe -h for more help" << std::endl;
71 COUT <<
" dpipe reads events from one source and writes it to a destination. The source can" << std::endl;
72 COUT <<
" be any of the standard sources (ET pool, file, filelist, or test stream). The destination " << std::endl;
73 COUT <<
" can be a file, a ET pool, an AML server, or no destination at all (you cannot write to a test " << std::endl;
74 COUT <<
" stream). Like with a Unix pipe, you can move events through a chain of sources " << std::endl;
75 COUT <<
" and destinations, for example, from one ET pool into another, or into a file. " << std::endl;
77 COUT <<
" While the events move through dpipe, you can have them identify themselves. If the " << std::endl;
78 COUT <<
" destination is null, this is a simple way to sift through a stream of events " << std::endl;
79 COUT <<
" and look at their identification messages. " << std::endl;
81 COUT <<
" You can throttle the data flow with the -w (wait) option, and you can stop after" << std::endl;
82 COUT <<
" a given number of events with the -n option. " << std::endl;
84 COUT <<
" In order to write events from a ET pool called ONLINE to a file (d.evt), use" << std::endl;
86 COUT <<
" > dpipe -s etpool -d file ONLINE d.evt" << std::endl;
88 COUT <<
" if you want to see which events are coming, add -i:" << std::endl;
90 COUT <<
" > dpipe -s etpool -d file -i ONLINE d.evt" << std::endl;
92 COUT <<
" If the output is a aml server, specify the destination as hostname:port: " << std::endl;
93 COUT <<
" > dpipe -s f -d a -i filename phnxbox4.phenix.bnl.gov:8900" << std::endl << std::endl;
94 COUT <<
" Note that you can abbreviate the etpool, file, listfile, and Test to d, f, l, and T." << std::endl;
95 COUT <<
" > dpipe -s etpool -d file ONLINE d.evt" << std::endl;
96 COUT <<
" is equivalent to " << std::endl;
97 COUT <<
" > dpipe -s d -d f ONLINE d.evt" << std::endl;
99 COUT <<
" List of options: " << std::endl;
100 COUT <<
" -s [d or f or l or T] source is et pool, file, listfile, or Test stream" << std::endl;
101 COUT <<
" -b <size in MB> in case you write to a file, specify the buffer size (default 4MB)" << std::endl;
102 COUT <<
" -d [d or f or a or n] destination is et pool or file , aml server or nothing" << std::endl;
103 COUT <<
" -v verbose" << std::endl;
104 COUT <<
" -w (time in milliseconds> wait time interval (in ms) between events to throttle the data flow" << std::endl;
105 COUT <<
" -e <event number> start from event number" << std::endl;
106 COUT <<
" -c <number> get nth event (-e gives event with number n)" << std::endl;
107 COUT <<
" -n <number> stop after so many events" << std::endl;
108 COUT <<
" -i have each event identify itself" << std::endl;
109 COUT <<
" -z gzip-compress each output buffer" << std::endl;
110 COUT <<
" -l LZO-compress each output buffer" << std::endl;
111 COUT <<
" -x sharedlibrary.so load a plugin that can select events" << std::endl;
112 COUT <<
" -h this message" << std::endl << std::endl;
131 if ( filter )
delete filter;
138 if ( filter && T == filter )
154 int sourcetype =
DFILE;
155 int destinationtype =
ETPOOL;
156 int waitinterval = 0;
161 int gzipcompress = 0;
171 int buffer_size = 256*1024*4 ;
181 while ((c = getopt(argc, argv,
"e:b:c:s:d:n:w:x:vhizl")) != EOF)
192 buffer_size = buffer_size*256*1024;
210 else if ( *
optarg ==
'f' ) destinationtype =
DFILE;
211 else if ( *
optarg ==
'n' ) destinationtype =
DNULL;
212 else if ( *
optarg ==
'a' ) destinationtype =
OAML;
241 voidpointer = dlopen(
optarg, RTLD_GLOBAL | RTLD_NOW);
244 std::cout <<
"Loading of the filter library "
245 <<
optarg <<
" failed: " << dlerror() << std::endl;
248 if (filter) std::cout <<
" Filter \"" << filter->
idString() <<
"\" registered" << std::endl;
267 while ( (chOpt = GetOption(argc, argv,
"e:b:c:s:d:n:w:vhiz", &pszParam) ) >1)
275 if ( !sscanf(pszParam,
"%d", &eventnumber) )
exitmsg();
280 if ( !sscanf(pszParam,
"%d", &buffer_size) )
exitmsg();
281 buffer_size = buffer_size*256*1024;
286 if ( !sscanf(pszParam,
"%d", &countnumber) )
exitmsg();
299 if ( *pszParam ==
'f' ) destinationtype =
DFILE;
300 else if ( *pszParam ==
'n' ) destinationtype =
DNULL;
313 if ( !sscanf(pszParam,
"%d", &waitinterval) )
exitmsg();
318 if ( !sscanf(pszParam,
"%d", &maxevents) )
exitmsg();
376 COUT <<
"Could not open input stream" << std::endl;
385 if ( destinationtype ==
DNULL )
390 else if ( destinationtype ==
DFILE)
392 buffer =
new PHDWORD [buffer_size];
396 fd =
open (argv[
optind+1], O_WRONLY | O_CREAT | O_EXCL | O_LARGEFILE ,
397 S_IRWXU | S_IROTH | S_IRGRP );
401 COUT <<
"Could not open file: " << argv[
optind+1] << std::endl;
406 ob =
new ogzBuffer (fd, buffer, buffer_size);
408 else if ( lzocompress)
410 ob =
new olzoBuffer (fd, buffer, buffer_size);
414 ob =
new ophBuffer (fd, buffer, buffer_size);
420 else if ( destinationtype ==
OAML)
422 buffer =
new PHDWORD [buffer_size];
429 COUT <<
"invalid destination" << std::endl;
440 while ( ( maxevents == 0 || eventnr < maxevents) &&
454 if ( countnumber && count < countnumber)
459 if ( (! filter) || filter->
select(evt) )
462 if ( destinationtype ==
DFILE || destinationtype ==
OAML )
468 COUT <<
"Error writing events " << std::endl;
478 if (waitinterval > 0) usleep(waitinterval*1000);
480 if (waitinterval > 0) Sleep(waitinterval);
486 if ( destinationtype ==
DFILE )
496 else if ( destinationtype ==
OAML )
506 #if defined(SunOS) || defined(Linux) || defined(OSF1)