Write data to a "pipe"

Hi,
I would greatly appreciate help with editing some code. The provided code writes binary samples from a usrp radio to a .dat file. I am trying to edit the code to write to a "pipe (or fifo)" rather than a file but have not had much luck. The code is provided below. Thank you!

#include <uhd/types/tune_request.hpp>
#include <uhd/utils/thread_priority.hpp>
#include <uhd/utils/safe_main.hpp>
#include <uhd/usrp/multi_usrp.hpp>
#include <uhd/exception.hpp>
#include <boost/program_options.hpp>
#include <boost/format.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <fstream>
#include <csignal>
#include <complex>

namespace po = boost::program_options;

static bool stop_signal_called = false;
void sig_int_handler(int){stop_signal_called = true;}

template<typename samp_type> void recv_to_file(
uhd::usrp::multi_usrp::sptr usrp,
const std::string &cpu_format,
const std::string &wire_format,
const std::string &file,
size_t samps_per_buff,
unsigned long long num_requested_samples,
double time_requested = 0.0,
bool bw_summary = false,
bool stats = false,
bool null = false,
bool enable_size_map = false,
bool continue_on_bad_packet = false
){
unsigned long long num_total_samps = 0;
//create a receive streamer
uhd::stream_args_t stream_args(cpu_format,wire_format);
uhd::rx_streamer::sptr rx_stream = usrp->get_rx_stream(stream_args);

uhd::rx_metadata_t md;
std::vector<samp_type> buff(samps_per_buff);
std::ofstream outfile;
if (not null)
outfile.open(file.c_str(), std::ofstream::binary);
bool overflow_message = true;

//setup streaming
uhd::stream_cmd_t stream_cmd((num_requested_samples == 0)?
uhd::stream_cmd_t::STREAM_MODE_START_CONTINUOUS:
uhd::stream_cmd_t::STREAM_MODE_NUM_SAMPS_AND_DONE
);
stream_cmd.num_samps = num_requested_samples;
stream_cmd.stream_now = true;
stream_cmd.time_spec = uhd::time_spec_t();
rx_stream->issue_stream_cmd(stream_cmd);

boost::system_time start = boost::get_system_time();
unsigned long long ticks_requested = (long)(time_requested * (double)boost::posix_time::time_duration::ticks_per_second());
boost::posix_time::time_duration ticks_diff;
boost::system_time last_update = start;
unsigned long long last_update_samps = 0;

typedef std::map<size_t,size_t> SizeMap;
SizeMap mapSizes;

while(not stop_signal_called and (num_requested_samples != num_total_samps or num_requested_samples == 0)){
boost::system_time now = boost::get_system_time();

size_t num_rx_samps = rx_stream->recv(&buff.front(), buff.size(), md, 3.0, enable_size_map);

if (enable_size_map){
SizeMap::iterator it = mapSizes.find(num_rx_samps);
if (it == mapSizes.end())
mapSizes[num_rx_samps] = 0;
mapSizes[num_rx_samps] += 1;
}

num_total_samps += num_rx_samps;

if (outfile.is_open())
outfile.write((const char*)&buff.front(), num_rx_samps*sizeof(samp_type));

if (bw_summary){
last_update_samps += num_rx_samps;
boost::posix_time::time_duration update_diff = now - last_update;
if (update_diff.ticks() > boost::posix_time::time_duration::ticks_per_second()) {
double t = (double)update_diff.ticks() / (double)boost::posix_time::time_duration::ticks_per_second();
double r = (double)last_update_samps / t;
std::cout << boost::format("\t%f Msps") % (r/1e6) << std::endl;
last_update_samps = 0;
last_update = now;
}
}

ticks_diff = now - start;
if (ticks_requested > 0){
if ((unsigned long long)ticks_diff.ticks() > ticks_requested)
break;
}
}

if (outfile.is_open())
outfile.close();

if (stats){
std::cout << std::endl;

double t = (double)ticks_diff.ticks() / (double)boost::posix_time::time_duration::ticks_per_second();
std::cout << boost::format("Received %d samples in %f seconds") % num_total_samps % t << std::endl;
double r = (double)num_total_samps / t;
std::cout << boost::format("%f Msps") % (r/1e6) << std::endl;

if (enable_size_map) {
std::cout << std::endl;
std::cout << "Packet size map (bytes: count)" << std::endl;
for (SizeMap::iterator it = mapSizes.begin(); it != mapSizes.end(); it++)
std::cout << it->first << ":\t" << it->second << std::endl;
}
}
}

typedef boost::function<uhd::sensor_value_t (const std::string&)> get_sensor_fn_t;

bool check_locked_sensor(std::vector<std::string> sensor_names, const char* sensor_name, get_sensor_fn_t get_sensor_fn, double setup_time){
if (std::find(sensor_names.begin(), sensor_names.end(), sensor_name) == sensor_names.end())
return false;

boost::system_time start = boost::get_system_time();
boost::system_time first_lock_time;

std::cout << boost::format("Waiting for \"%s\": ") % sensor_name;
std::cout.flush();

while (true){
if ((not first_lock_time.is_not_a_date_time()) and
(boost::get_system_time() > (first_lock_time + boost::posix_time::seconds(setup_time))))
{
std::cout << " locked." << std::endl;
break;
}

if (get_sensor_fn(sensor_name).to_bool()){
if (first_lock_time.is_not_a_date_time())
first_lock_time = boost::get_system_time();
std::cout << "+";
std::cout.flush();
}
else{
first_lock_time = boost::system_time(); //reset to 'not a date time'

if (boost::get_system_time() > (start + boost::posix_time::seconds(setup_time))){
std::cout << std::endl;
throw std::runtime_error(str(boost::format("timed out waiting for consecutive locks on sensor \"%s\"") % sensor_name));
}

std::cout << "_";
std::cout.flush();
}

boost::this_thread::sleep(boost::posix_time::milliseconds(100));
}

std::cout << std::endl;

return true;
}

int UHD_SAFE_MAIN(int argc, char *argv[]){
uhd::set_thread_priority_safe();

//variables to be set by po
std::string args, file, type, ant, subdev, ref, wirefmt;
size_t total_num_samps, spb;
double rate, freq, gain, bw, total_time, setup_time;

//setup the program options
po::options_description desc("Allowed options");
desc.add_options()
("help", "help message")
("args", po::value<std::string>(&args)->default_value(""), "multi uhd device address args")
("file", po::value<std::string>(&file)->default_value("usrp_samples.dat"), "name of the file to write binary samples to")
("type", po::value<std::string>(&type)->default_value("short"), "sample type: double, float, or short")
("nsamps", po::value<size_t>(&total_num_samps)->default_value(0), "total number of samples to receive")
("time", po::value<double>(&total_time)->default_value(0), "total number of seconds to receive")
("spb", po::value<size_t>(&spb)->default_value(10000), "samples per buffer")
("rate", po::value<double>(&rate)->default_value(1e6), "rate of incoming samples")
("freq", po::value<double>(&freq)->default_value(0.0), "RF center frequency in Hz")
("gain", po::value<double>(&gain), "gain for the RF chain")
("ant", po::value<std::string>(&ant), "daughterboard antenna selection")
("subdev", po::value<std::string>(&subdev), "daughterboard subdevice specification")
("bw", po::value<double>(&bw), "daughterboard IF filter bandwidth in Hz")
("ref", po::value<std::string>(&ref)->default_value("internal"), "waveform type (internal, external, mimo)")
("wirefmt", po::value<std::string>(&wirefmt)->default_value("sc16"), "wire format (sc8 or sc16)")
("setup", po::value<double>(&setup_time)->default_value(1.0), "seconds of setup time")
("progress", "periodically display short-term bandwidth")
("stats", "show average bandwidth on exit")
("sizemap", "track packet size and display breakdown on exit")
("null", "run without writing to file")
("continue", "don't abort on a bad packet")
("skip-lo", "skip checking LO lock status")
("int-n", "tune USRP with integer-N tuning")
;
po::variables_map vm;
po::store(po::parse_command_line(argc, argv, desc), vm);
po::notify(vm);

#define recv_to_file_args(format) \
(usrp, format, wirefmt, file, spb, total_num_samps, total_time, bw_summary, stats, null, enable_size_map, continue_on_bad_packet)
//recv to file
if (type == "double") recv_to_file<std::complex<double> >recv_to_file_args("fc64");
else if (type == "float") recv_to_file<std::complex<float> >recv_to_file_args("fc32");
else if (type == "short") recv_to_file<std::complex<short> >recv_to_file_args("sc16");
else throw std::runtime_error("Unknown type " + type);

//finished
std::cout << std::endl << "Done!" << std::endl << std::endl;

return EXIT_SUCCESS;
}
Isn't is simply a case of asking the operating system to pipe the standard output? In terms of the code, that means instead of writing to a file you need to send the data to the standard output std::cout etc.

You may need to separate out any extraneous information messages and direct them to a different output (such as a log file).

http://www.linfo.org/pipes.html
Topic archived. No new replies allowed.