/* ts_merge.cc
*
* Concatenate .ts files with overlap into a contiguous stream
*
* (c) Kurt Garloff <kurt@garloff.de>, 10/2013
* License: GNU GPL v2 or v3 (at your option)
*/
#include "ts_fmt.h"
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
#include <time.h>
#include <sys/time.h>
#include <signal.h>
#include <vector>
#include <list>
#include <typeinfo>
#include "ts_stream.h"
#include <fstream>
#include "reader.h"
#include "ts_sblock.h"
/* Globals required by headers */
int debug, verbose;
volatile int intr = 0;
/* Options */
int progfilter = -1;
int tsidfilter = -1;
/* --- */
using namespace std;
/* Here's the data structures we create:
* (a) a list of extents with their local stream properties (stream_des)
* (b) list of streams by seeing whether the stream_des can be merged
*/
list<stream_block> sblocks;
int may_append(stream_block& sb)
{
if (sb.sdes.tsid == (uint16_t)-1 || sb.sdes.first_pcr == INVALID_PCR)
return 0;
sb.sdes.finalize();
/* tsid filter */
if (tsidfilter != -1 && sb.sdes.tsid != tsidfilter)
return 0;
/* program filter */
if (progfilter != -1) {
if (!sb.sdes.programs.count(progfilter))
return 0;
int prgpid = sb.sdes.programs.find(progfilter)->second;
if (!sb.sdes.pids.count(prgpid))
return 0;
if (sb.sdes.pids.find (prgpid)->second.len == 0)
return 0;
}
/* passed */
sblocks.push_back(sb);
return 1;
}
static int prog_notfirst;
void progress(off_t prog_off, const reader &rd, size_t tot_len = 0, const size_t st_off = 0)
{
static const char *wdays[] = { "Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat" };
static struct timeval tv0, tv1, tv2;
static off_t lastoff, firstoff;
static double lastspeed = 0;
if (tot_len == 0)
tot_len = rd.inlen;
gettimeofday(&tv2, 0);
if (prog_notfirst) {
double avgspeed = prog_off/(1048576*(tv2.tv_sec-tv0.tv_sec+1e-6*(tv2.tv_usec-tv0.tv_usec)));
lastspeed = lastspeed*7/8 + (prog_off-lastoff)/(8388608*(tv2.tv_sec-tv1.tv_sec+1e-6*(tv2.tv_usec-tv1.tv_usec)));
if (lastoff == firstoff)
lastspeed *= 8;
unsigned remain = (tot_len-prog_off)/1048576/(7*avgspeed/8+lastspeed/8)+0.9;
time_t finish = tv2.tv_sec+remain;
struct tm *tmtm = localtime(&finish);
printf("%s: @ %liMB (%4.1f%%) (%5.1fMB/s, avg %5.1fMB/s, ETA: %s %02i:%02i:%02i %s) %li frags \r",
rd.iname, (prog_off+st_off)/1048576, 100.0*prog_off/tot_len, lastspeed, avgspeed,
wdays[tmtm->tm_wday], tmtm->tm_hour, tmtm->tm_min, tmtm->tm_sec, *tzname,
sblocks.size());
fflush(stdout);
} else {
firstoff = prog_off;
tv0 = tv2;
}
tv1 = tv2;
lastoff = prog_off;
prog_notfirst = 1;
}
int readinput(const char* iname, off_t spos, off_t maxlen,
const float pcrbegin, const float pcrend)
{
int cnt = 0;
reader rd(iname, spos);
if (rd.infd == -1)
return 0;
if (maxlen && spos + maxlen < (off_t)rd.inlen)
rd.inlen = spos + maxlen;
else
maxlen = rd.inlen - spos;
stream_block sb(iname);
off_t start, prev = spos;
prog_notfirst = 0;
progress(0, rd, maxlen, spos);
bool sbset = false;
for (start = spos; -1 != (start = sb.read_cut(start, rd, pcrbegin, pcrend)); prev = start) {
//for (start = spos; -1 != (start = sb.read(start, rd)); prev = start) {
if (!sbset) {
spos = sb.srcoff;
sbset = true;
}
if (intr) {
start = prev;
intr = 0;
break;
}
if (start != prev) {
cnt += may_append(sb);
sb.reset();
} else
start += fsblocksize;
if (start>>28 != prev>>28)
progress(start-spos, rd, maxlen, spos);
}
cnt += may_append(sb);
progress(start == -1? sb.size+sb.srcoff-spos: start-spos, rd, maxlen, spos);
printf("\n");
return cnt;
}
/* One ts file */
class ts_file {
public:
stream_des sdes;
//list<stream_frag_t> fragments;
list<stream_block> fragments;
};
ts_file merged_file;
void consolidate()
{
if (verbose)
cout << "Sorting ... " << flush;
sblocks.sort();
if (verbose)
cout << "Merging ...\n" << flush;
merged_file.sdes.len = 0;
//double last_pcr = INVALID_PCR;
for (list<stream_block>::iterator it = sblocks.begin(); it != sblocks.end(); it = sblocks.erase(it)) {
if (debug)
cerr << *it << endl;
if (merged_file.sdes.tsid == INVALID_TSID) {
merged_file.sdes = it->sdes;
merged_file.fragments.push_back(*it);
if (verbose)
cerr << "Init stream " << it->sdes.tsid << " ln " << dec << merged_file.sdes.len << " " << it->size << endl;
} else if (merged_file.sdes.tsid != it->sdes.tsid) {
cerr << "Can not merge " << *it << endl;
continue;
} else if (pcr_diff(merged_file.sdes.last_pcr, it->sdes.first_pcr) == 0) {
/* Trivial concatenation */
if (verbose)
cerr << "Trivial merge at" << it->sdes.first_pcr << " ln " << merged_file.sdes.len + it->size << endl;
merged_file.sdes.last_pcr = it->sdes.last_pcr;
merged_file.sdes.len += it->size;
merged_file.fragments.push_back(*it);
} else if (pcr_diff(merged_file.sdes.last_pcr, it->sdes.first_pcr) > 0) {
/* Cutting concatenation */
double mid = (merged_file.sdes.last_pcr+it->sdes.first_pcr)/2;
/* Handle wraparound */
if (merged_file.sdes.last_pcr < it->sdes.first_pcr) {
mid = (MAX_PCR + merged_file.sdes.last_pcr + it->sdes.first_pcr)/2;
if (mid > MAX_PCR)
mid -= MAX_PCR;
}
if (verbose)
cerr << "Cutting merge at " << merged_file.sdes.last_pcr << "/" << it->sdes.first_pcr << ":" << mid << endl;
/* Prev block */
stream_block& lastsb = merged_file.fragments.back();
reader rd1 = reader(lastsb.srcnm, lastsb.srcoff);
off_t cut1 = lastsb.find_pcr(rd1, mid);
if (cut1 == (off_t)-1)
cerr << "Could not find pos " << mid << " in prev. stream!" << endl;
merged_file.sdes.len -= (lastsb.size+lastsb.srcoff-cut1);
if (debug)
cerr << "PCR " << mid << "@" << dec << cut1 << ": cut off " << lastsb.size+lastsb.srcoff-cut1
<< " from " << lastsb.size << " at end -> ln " << merged_file.sdes.len << endl;
lastsb.size = cut1 - lastsb.srcoff;
lastsb.sdes.last_pcr = mid;
/* Next block */
reader rd2 = reader(it->srcnm, it->srcoff);
off_t cut2 = it->find_pcr(rd2, mid);
if (cut2 == (off_t)-1)
cerr << "Could not find pos " << mid << " in next stream!" << endl;
if (debug)
cerr << "PCR " << mid << "@" << dec << cut2 << ": cut off " << cut2-it->srcoff
<< " from " << it->size << " at start -> ln " << merged_file.sdes.len + (it->size-cut2+it->srcoff) << endl;
it->sdes.first_pcr = mid;
it->size -= (cut2 - it->srcoff);
merged_file.sdes.len += it->size;
it->srcoff = cut2;
/* Now append */
merged_file.sdes.last_pcr = it->sdes.last_pcr;
merged_file.fragments.push_back(*it);
/* TO BE IMPLEMENTED */
} else {
/* hole */
cerr << "Hole at " << merged_file.sdes.last_pcr << "/" << it->sdes.first_pcr << endl;
}
}
}
#define CBUFSZ 65536
unsigned char copybuf[CBUFSZ];
int writefile(const char* outnm, const ts_file& tsf)
{
//string onm = (strcmp(dirnm, ".")? string(dirnm)+"/"+tsf.sdes.name(): tsf.sdes.name());
string onm = outnm;
string oonm = onm;
int ofd = -1;
int ctr = 0;
while (ofd == -1) {
ofd = open(onm.c_str(), O_RDWR | O_CREAT | O_EXCL, 0644);
if (ofd == -1) {
if (errno == EEXIST) {
char digit[4];
snprintf(digit, 4, "%03i", ctr++);
onm = oonm + "." + digit;
} else {
fprintf(stderr, "Could not open %s for writing: %s\n",
onm.c_str(), strerror(errno));
return -1;
}
}
}
if (1 || verbose)
printf("Writing ~%liMB to %s ...\n",
(tsf.sdes.len+524288)/1048576, onm.c_str());
if (fallocate64(ofd, 1, 0, tsf.sdes.len) && verbose)
fprintf(stderr, "Warn: fallocate64 %s failed: %s\n", onm.c_str(), strerror(errno));
int ifd = -1; const char* inm = 0;
for (list<stream_block>::const_iterator it = tsf.fragments.begin(); it != tsf.fragments.end(); ++it) {
if (inm != it->srcnm) {
if (ifd != -1)
close(ifd);
inm = it->srcnm;
ifd = open(inm, O_RDONLY);
}
size_t wr = 0;
while (wr < it->size) {
size_t tord = it->size - wr;
size_t rd = pread(ifd, copybuf, CBUFSZ<tord? CBUFSZ: tord, it->srcoff+wr);
wretry:
size_t ww = write(ofd, copybuf, rd);
if (ww <= 0) {
fprintf(stderr, "Write to %s failed: %s\n", onm.c_str(), strerror(errno));
fprintf(stderr, "Retry(Y/N) ? "); fflush(stderr);
int c = getchar();
if (c == 'Y' || c == 'y')
goto wretry;
else {
close(ifd); close(ofd);
return -1;
}
}
wr += ww;
}
if (intr) {
--intr;
break;
}
}
close(ifd);
close(ofd);
return 0;
}
void usage()
{
fprintf(stderr, "Usage: ts_merge [options] INFILE1 [INFILE2 [...]]\n");
fprintf(stderr, " options: -d increase debug level\n");
fprintf(stderr, " -v verbose\n");
fprintf(stderr, " -g progid only consider fragments with program ID progid\n");
fprintf(stderr, " -t tsid only consider fragments with TS ID tsid\n");
fprintf(stderr, " -o output output reassembled stream into output file\n");
fprintf(stderr, " -s startpos start at offset startpos in infile(s)\n");
fprintf(stderr, " -S 1stspos start at offset startpos in first infile\n");
fprintf(stderr, " -m maxsearch end after maxsearch bytes in infile(s)\n");
fprintf(stderr, " -p pcrjump allow up to pcrjump secs discont in PCR [def=1.6]\n");
fprintf(stderr, " -r range only output range (format: start-end)\n");
//fprintf(stderr, " -c comrng cut out commercial breaks (format: see -r)\n");
exit(1);
}
size_t readint(char* str)
{
size_t val;
char c;
if (isdigit(str[strlen(str)-1]))
sscanf(str, "%zi", &val);
else {
sscanf(str, "%zi%c", &val, &c);
switch (c) {
case 'b': val *= 512; break;
case 'k': val *= 1024; break;
case 'M': val *= (1024*1024); break;
case 'G': val *= (1024*1024*1024); break;
case 'T': val *= (1024*1024*1024*1024ULL); break;
case ' ': break;
default: fprintf(stderr, "Unknown suffix \"%c\", returning %zi\n", c, val);
}
}
return val;
}
static void breakhandler(int sig)
{
signal(sig, SIG_DFL);
fprintf(stderr, "\nReceived signal %i \n", sig);
++intr;
}
int main(int argc, char *argv[])
{
char *outfile = 0, *rngstr = 0;
off_t spos = 0, firstspos = (off_t)-1, maxlen = 0;
float pcrbegin = INVALID_PCR, pcrend = INVALID_PCR;
int c;
while (-1 != (c = getopt(argc, argv, "hdvt:g:o:s:S:m:p:r:"))) {
switch(c) {
case 'h': usage(); break;
case 'd': ++debug; ++verbose; break;
case 'v': ++verbose; break;
case 'o': outfile = optarg; break;
case 'g': progfilter = atoi(optarg); break;
case 't': tsidfilter = atoi(optarg); break;
case 's': spos = readint(optarg); break;
case 'S': firstspos = readint(optarg); break;
case 'm': maxlen = readint(optarg); break;
case 'p': pcrtol = atof(optarg); break;
case 'r': rngstr = optarg; break;
case ':': fprintf(stderr, "Missing arg to option -%c\n", c); usage(); break;
case '?': fprintf(stderr, "Uknown option -%c\n", optopt); usage(); break;
}
}
if (optind == argc)
usage();
if (rngstr) {
if (sscanf(rngstr, "%f-%f", &pcrbegin, &pcrend) != 2) {
fprintf(stderr, "ERROR: Failed to parse range %s\n", rngstr);
exit(2);
}
}
signal(SIGINT, breakhandler);
signal(SIGTERM, breakhandler);
signal(SIGQUIT, breakhandler);
off_t tspos = (firstspos == (off_t)-1)? spos: firstspos;
for (int i = optind; i < argc; ++i) {
if (verbose)
printf("%s: %i fragments\n", argv[i], readinput(argv[i], tspos, maxlen, pcrbegin, pcrend));
else
readinput(argv[i], tspos, maxlen, pcrbegin, pcrend);
tspos = spos;
}
if (debug > 1) {
for (list<stream_block>::const_iterator it = sblocks.begin(); it != sblocks.end(); ++it)
cout << *it << "\n ";
}
consolidate();
if (sblocks.size()) {
cerr << "Strange: Fragment list not empty!\n";
for (list<stream_block>::const_iterator it = sblocks.begin(); it != sblocks.end(); ++it)
cout << *it << "\n ";
}
if (verbose)
cout << merged_file.sdes.name() << ": " << merged_file.sdes << "\n (" << dec
<< merged_file.fragments.size() << " fragments, " << fixed << setprecision(0)
<< merged_file.sdes.len/125/merged_file.sdes.duration() << "kbit/s)\n";
if (outfile && !intr)
writefile(outfile, merged_file);
return 0;
}