ECCE @ EIC Software
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
oamlBuffer.cc
Go to the documentation of this file. Or view the newest version in sPHENIX GitHub for file oamlBuffer.cc
1 
2 #include "oamlBuffer.h"
3 #include "BufferConstants.h"
4 
5 #include <unistd.h>
6 #include <netdb.h>
7 #include <cstring>
8 #include <arpa/inet.h>
9 #include <netinet/in.h>
10 #include <sys/socket.h>
11 
12 #if defined(SunOS) || defined(OSF1)
13 #include <strings.h>
14 #endif
15 
16 
17 // the constructor first ----------------
18 oamlBuffer::oamlBuffer ( const char * host, const int port, PHDWORD * where,
19  const int length,
20  const int irun,
21  const int iseq):
22  olzoBuffer(0,where,length,irun,iseq)
23 {
24  good_object = 1;
25 
26  has_begun = 0; // we will defer the connection until we have the first buffer to write
27 
28 
29  ThePort = port;
30  strcpy(HostName, host);
31 
32  if ( connect_aml() ) good_object =0;
33 
35  RunNumber = irun;
36  if ( irun)
37  {
39  }
40 }
41 
42 
43 oamlBuffer::oamlBuffer ( const char * hostport, PHDWORD * where,
44  const int length,
45  const int irun,
46  const int iseq):
47  olzoBuffer(0,where,length,irun,iseq)
48 {
49  good_object = 1;
50 
51  has_begun = 0; // we will defer the connection until we have the first buffer to write
52 
53  char *t = new char[strlen(hostport)+1];
54  strcpy ( t, hostport);
55 
56  char *token = strtok(t, ":");
57  if (token)
58  {
59  strcpy( HostName, token);
60  token = strtok(0, ":");
61  if (token)
62  {
63  sscanf(token, "%d", &ThePort);
64  }
65  }
66 
67  delete [] t;
68 
69  if ( connect_aml() ) good_object =0;
70 
72  RunNumber = irun;
73  if ( irun)
74  {
76  }
77 
78 }
79 
80 
82 {
83 
84  struct sockaddr_in server_addr;
85  struct hostent *p_host;
86  p_host = gethostbyname(HostName);
87 
88  if ( ! p_host) return -3;
89 
90  bzero( (char*) &server_addr, sizeof(server_addr) );
91  server_addr.sin_family = AF_INET;
92  bcopy(p_host->h_addr, &(server_addr.sin_addr.s_addr), p_host->h_length);
93  server_addr.sin_port = htons(ThePort);
94 
95 
96  if ( (sockfd = socket(AF_INET, SOCK_STREAM, 0) ) < 0 )
97  {
98  std::cout << "error in socket" << std::endl;
99  good_object = 0;
100  return -1;
101  }
102 
103  int xs = 512*1024;
104 
105  int s = setsockopt(sockfd, SOL_SOCKET, SO_SNDBUF,
106  &xs, 4);
107  if (s) std::cout << "setsockopt status = " << s << std::endl;
108 
109  if ( connect(sockfd, (struct sockaddr*) &server_addr
110  , sizeof(server_addr)) < 0 )
111  {
112  std::cout << "error in connect" << std::endl;
113  good_object = 0;
114  return -1;
115  }
116 
117  return 0;
118 }
119 
121 {
122  int i;
123  int controlword = htonl(CTRL_BEGINRUN);
124  int hrun;
125  if ( DontOverrideRunNumber )
126  {
127  hrun = htonl(RunNumber); // this is our own, cached value from the constructor
128  }
129  else
130  {
131  hrun = htonl(runnumber); // this is the one which the oBuffer parent maintains
132  }
133  writen (sockfd,(char *) &controlword, 4);
134  writen (sockfd,(char *) &hrun, 4);
135  readn (sockfd, (char *) &i, 4);
136  has_begun = 1;
137 
138  return 0;
139 
140 }
141 
142 // ----------------------------------------------------------
143 // returns the number of bytes written, including record wasted space.
144 //
146 {
147 
148  int cstatus = 0;
149 
150  if (! dirty) return 0;
151 
152  if ( ! good_object) return 1;
153  if ( ! has_begun ) cstatus = begin_run();
154 
155  if ( cstatus ) return cstatus;
156 
157  if (! has_end) addEoB();
158 
159  lzo_uint outputlength_in_bytes = outputarraylength*4-16;
160  lzo_uint in_len = bptr->Length;
161 
162  lzo1x_1_12_compress( (lzo_byte *) bptr,
163  in_len,
164  (lzo_byte *)&outputarray[4],
165  &outputlength_in_bytes,wrkmem);
166 
167 
168  outputarray[0] = outputlength_in_bytes +4*BUFFERHEADERLENGTH;
170  outputarray[2] = bptr->Bufseq;
171  outputarray[3] = bptr->Length;
172 
173 
174  int controlword = htonl(CTRL_DATA);
175  // std::cout << __LINE__ << " sent control word" << std::endl;
176  writen (sockfd,(char *) &controlword, 4);
177  int i = htonl(outputarray[0]);
178  writen (sockfd,(char *) &i, 4);
179  writen (sockfd,(char *)outputarray, outputarray[0] );
180 
181  // std::cout << __LINE__ << " waiting for ack" << std::endl;
182  readn (sockfd, (char *) &i, 4);
183  // std::cout << __LINE__ << " got it" << std::endl;
184 
185 
186 
187  dirty = 0;
189  return 0;
190 }
191 
192 
193 // ----------------------------------------------------------
195 {
196  int wstatus = writeout();
197 
198  if ( ! wstatus)
199  {
200 
201  int i;
202  int controlword = htonl(CTRL_ENDRUN);
203  writen (sockfd, (char *)&controlword, 4);
204 
205 
206  readn (sockfd, (char *) &i, 4);
207 
208  controlword = htonl(CTRL_CLOSE);
209  writen (sockfd, (char *)&controlword, 4);
210  }
211  dirty = 0;
212  close (sockfd);
213 
214 }
215 
216 int oamlBuffer::readn (int fd, char *ptr, int nbytes)
217 {
218  int nleft, nread;
219  nleft = nbytes;
220  while ( nleft>0 )
221  {
222  nread = read (fd, ptr, nleft);
223  if ( nread < 0 )
224  return nread;
225  else if (nread == 0)
226  break;
227  nleft -= nread;
228  ptr += nread;
229  }
230  return (nbytes-nleft);
231 }
232 
233 
234 int oamlBuffer::writen (int fd, char *ptr, int nbytes)
235 {
236  int nleft, nwritten;
237  nleft = nbytes;
238  while ( nleft>0 )
239  {
240  nwritten = write (fd, ptr, nleft);
241  if ( nwritten < 0 )
242  return nwritten;
243 
244  nleft -= nwritten;
245  ptr += nwritten;
246  }
247  return (nbytes-nleft);
248 }