--- /dev/null
+package cache
+
+import (
+ "fmt"
+ "io"
+ "net/http"
+ "net/url"
+ "os"
+ "strings"
+ "time"
+)
+
+type Cache struct {
+ Timeout time.Duration
+ rssUrl string
+ root *os.Root
+ fileName string
+}
+
+func CreateCache(rssUrl string) (Cache, error) {
+ var result Cache
+ result.Timeout = time.Hour
+ result.rssUrl = rssUrl
+ result.root = cacheFactory.root
+
+ parsedUrl, err := url.ParseRequestURI(rssUrl)
+ if err != nil {
+ return result, err
+ }
+ result.fileName = fmt.Sprintf("%s%s", parsedUrl.Hostname(), strings.ReplaceAll(parsedUrl.Path, "/", "."))
+ if result.staleness() != nil {
+ return result, result.Fetch()
+ }
+ return result, nil
+}
+
+func (c Cache) Fetch() error {
+ resp, err := http.Get(c.rssUrl)
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+ f, err := c.root.Create(c.fileName)
+ if err != nil {
+ return err
+ }
+ _, err = io.Copy(f, resp.Body)
+ return err
+}
+
+func (c Cache) Source() string {
+ return c.rssUrl
+}
+
+func (c Cache) Open() (io.ReadCloser, error) {
+ err := c.staleness()
+ if err != nil {
+ fmt.Printf("fetching %s because of: %s\n", c.fileName, err)
+ err := c.Fetch()
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ // file exists
+ return c.root.Open(c.fileName)
+}
+
+func (c Cache) staleness() error {
+ fileInfo, err := c.root.Stat(c.fileName)
+
+ if err != nil {
+ return err
+ }
+
+ timeSince := time.Since(fileInfo.ModTime())
+ if c.Timeout < timeSince {
+ return fmt.Errorf("timeout of %s exceeded", c.Timeout.String())
+ }
+
+ return nil
+}
--- /dev/null
+package cache
+
+import (
+ "errors"
+ "io/fs"
+ "os"
+ "path"
+)
+
+var cacheFactory factory
+
+type factory struct {
+ root *os.Root
+}
+
+func createFactory(root *os.Root) factory {
+ return factory{root}
+}
+
+func init() {
+ cacheDir, err := os.UserCacheDir()
+ if err != nil {
+ panic(err)
+ }
+ cacheDir = path.Join(cacheDir, "aggred-cache")
+ err = os.Mkdir(cacheDir, fs.ModeDir|0740)
+ if err != nil && !errors.Is(err, fs.ErrExist) {
+ panic(err)
+ }
+ cacheRoot, err := os.OpenRoot(cacheDir)
+ if err != nil {
+ panic(err)
+ }
+ cacheFactory = createFactory(cacheRoot)
+}
--- /dev/null
+package combine
+
+import "aggred/rsstypes"
+
+func Combine(template rsstypes.Rss, others ...rsstypes.Rss) rsstypes.Rss {
+ var items = template.Channel.Items
+ for _, feed := range others {
+ items = append(items, itemReferences(feed)...)
+ }
+ template.Channel.Items = items
+ return template
+}
+
+func itemReferences(rss rsstypes.Rss) []rsstypes.Item {
+ title := rss.Channel.Title
+ source := rss.Source
+ if source == "" {
+ return rss.Channel.Items
+ }
+ for i := range rss.Channel.Items {
+ // edit in place
+ rss.Channel.Items[i].Source = rsstypes.Source{
+ title,
+ source,
+ }
+ }
+ return rss.Channel.Items
+}
--- /dev/null
+module aggred
+
+go 1.25.0
--- /dev/null
+package main
+
+import (
+ "aggred/pipe"
+ "aggred/rsscollector"
+ "aggred/rsstypes"
+ "bytes"
+ "context"
+ "encoding/xml"
+ "fmt"
+ "os"
+ "os/signal"
+)
+
+const pipeName = "pipe.io"
+
+func template() rsstypes.Rss {
+ templateString := `
+ <rss version="2.0">
+ <channel>
+ <title>Combined</title>
+ </channel>
+ </rss>
+ `
+ var template rsstypes.Rss
+ err := xml.NewDecoder(bytes.NewBufferString(templateString)).Decode(&template)
+ if err != nil {
+ panic(err)
+ }
+ return template
+}
+
+func main() {
+ nytimes := "https://rss.nytimes.com/services/xml/rss/nyt/Europe.xml"
+ hackaday := "https://hackaday.com/blog/feed/"
+ reader, err := rsscollector.NewRssCollector(template(), nytimes, hackaday)
+ if err != nil {
+ panic(err)
+ }
+ ctx, cancelCtx := context.WithCancel(context.Background())
+ defer cancelCtx()
+
+ closePipe, err := pipe.DoPipe(ctx, reader, "combinedrss.namedpipe")
+ if err != nil {
+ panic(err)
+ }
+ defer func() {
+ fmt.Println("closing named pipe")
+ if err := closePipe(); err != nil {
+ fmt.Println(err)
+ }
+ }()
+
+ signalCtx, _ := signal.NotifyContext(ctx, os.Interrupt)
+ <-signalCtx.Done()
+ fmt.Println("stopping after interrupt received")
+}
--- /dev/null
+package pipe
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "io"
+ "io/fs"
+ "os"
+ "syscall"
+ "time"
+)
+
+func DoPipe(ctx context.Context, content io.ReadSeeker, pipePath string) (func() error, error) {
+ closer, err := ensurePipe(pipePath)
+ if err != nil {
+ return closer, fmt.Errorf("failed to create a fifo pipe file: %w", err)
+ }
+ go writeToPipe(ctx, content, pipePath)
+ return closer, nil
+}
+
+func ensurePipe(pipeName string) (func() error, error) {
+ if err := os.Remove(pipeName); err != nil {
+ if !errors.Is(err, fs.ErrNotExist) {
+ return func() error { return err }, err
+ }
+ }
+ err := syscall.Mkfifo(pipeName, 0666)
+ if err != nil {
+ return func() error { return err }, err
+ }
+ return func() error { return os.Remove(pipeName) }, nil
+}
+
+func writeToPipe(ctx context.Context, content io.ReadSeeker, pipeName string) {
+ fileFlags := os.O_WRONLY | os.O_APPEND
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ file, err := os.OpenFile(pipeName, fileFlags, fs.ModeNamedPipe)
+ if err != nil {
+ panic(err)
+ }
+ file.WriteString("")
+ bytes, err := io.Copy(file, content)
+ fmt.Printf("Written %v bytes\n", bytes)
+ if err != nil {
+ fmt.Println(err)
+ fmt.Println("seeking start of content")
+ content.Seek(0, io.SeekStart)
+ }
+ err = file.Close()
+ if err != nil {
+ fmt.Println(err)
+ }
+ // allow time for reader of file to close the handle
+ <-time.After(time.Millisecond)
+ }
+ }
+}
--- /dev/null
+package rsscollector
+
+import (
+ "aggred/cache"
+ "aggred/combine"
+ "aggred/rsstypes"
+ "bytes"
+ "encoding/xml"
+ "errors"
+ "fmt"
+ "io"
+ "slices"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+type RssCollector struct {
+ template rsstypes.Rss
+ sources []cache.Cache
+ buff bytes.Buffer
+}
+
+func NewRssCollector(template rsstypes.Rss, rsssources ...string) (io.ReadSeeker, error) {
+ rw := new(RssCollector)
+ rw.template = template
+
+ rw.sources = make([]cache.Cache, 0, len(rsssources))
+ errs := make([]error, 0)
+ for _, source := range rsssources {
+ c, err := cache.CreateCache(source)
+ if err != nil {
+ errs = append(errs, fmt.Errorf("could not create a source for %v: %w", source, err))
+ } else {
+ rw.sources = append(rw.sources, c)
+ }
+ }
+
+ return rw, errors.Join(errs...)
+}
+
+func (r *RssCollector) Read(p []byte) (n int, err error) {
+ err = r.ensureContent()
+ if err != nil {
+ return 0, err
+ }
+ return r.buff.Read(p)
+}
+
+func (r *RssCollector) WriteTo(w io.Writer) (n int64, err error) {
+ err = r.ensureContent()
+ if err != nil {
+ return 0, err
+ }
+ return r.buff.WriteTo(w)
+}
+
+func (r *RssCollector) Seek(offset int64, whence int) (int64, error) {
+ if whence != io.SeekStart {
+ return 0, errors.New("seeking is only supported from io.SeekStart")
+ }
+ if offset != 0 {
+ return 0, errors.New("seeking is only supported with 0 offset")
+ }
+ r.buff.Reset()
+ return 0, nil
+}
+
+func (r *RssCollector) ensureContent() error {
+ if r.buff.Len() != 0 {
+ // some content left
+ return nil
+ }
+
+ feeds := make([]rsstypes.Rss, len(r.sources))
+ errs := make([]error, len(r.sources))
+ errCount := atomic.Int32{}
+
+ wg := sync.WaitGroup{}
+ workers := make(chan struct{}, 5)
+ for i := range r.sources {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ workers <- struct{}{}
+ defer func() { <-workers }()
+
+ source := r.sources[i]
+ reader, err := source.Open()
+ if err != nil {
+ errCount.Add(1)
+ errs[i] = err
+ return
+ }
+ decoder := xml.NewDecoder(reader)
+
+ err = decoder.Decode(&feeds[i])
+ if err != nil {
+ errCount.Add(1)
+ errs[i] = err
+ return
+ }
+
+ feeds[i].Source = r.sources[i].Source()
+ }()
+ }
+
+ wg.Wait()
+
+ if errCount.Load() == int32(len(errs)) {
+ noneLoadedErr := errors.New("could not load a single source")
+ return fmt.Errorf("%w: %w", noneLoadedErr, errors.Join(errs...))
+ } else if errCount.Load() != 0 {
+ someFailedErr := errors.New("could not load some sources")
+ err := fmt.Errorf("%w: %w", someFailedErr, errors.Join(errs...))
+ fmt.Println(err)
+ }
+
+ r.buff.Reset()
+ result := combine.Combine(r.template, feeds...)
+
+ slices.SortFunc(result.Channel.Items, compareItems)
+
+ for i := range result.Channel.Items {
+ clearExtra(&result.Channel.Items[i])
+ }
+ return xml.NewEncoder(&r.buff).Encode(result)
+}
+
+func compareItems(a, b rsstypes.Item) int {
+ var aTime, bTime time.Time
+ var err error
+ defaultTime := time.Now().Add(-time.Hour * 24)
+ aPub := a.PubDate
+ bPub := b.PubDate
+
+ aTime, err = time.Parse(time.RFC1123Z, aPub)
+ if err != nil {
+ aTime = defaultTime
+ }
+
+ bTime, err = time.Parse(time.RFC1123Z, bPub)
+ if err != nil {
+ bTime = defaultTime
+ }
+
+ // negative to put newest first
+ return -aTime.Compare(bTime)
+}
+
+func clearExtra(item *rsstypes.Item) {
+ item.PubDate = ""
+}
--- /dev/null
+package rsscollector
+
+import (
+ "bytes"
+ "io"
+ "testing"
+)
+
+var _ io.ReadSeeker = (*RssCollector)(nil)
+var _ io.WriterTo = (*RssCollector)(nil)
+
+func TestBufferLenZeroAfterFullWrite(t *testing.T) {
+ bufferFrom := bytes.NewBufferString("foobarspam")
+ var bufferTo bytes.Buffer
+ fullLength := bufferFrom.Len()
+
+ bytes, err := io.Copy(&bufferTo, bufferFrom)
+ if err != nil {
+ t.Fatalf("expected to copy without %v error but got %v", nil, err)
+ }
+ if bytes != int64(fullLength) {
+ t.Errorf("expected to write the full %v length of buffer but only wrote %v", fullLength, bytes)
+ }
+
+ if bufferTo.Len() != fullLength {
+ t.Errorf("expected receiving buffer to have %v unread bytes but got %v", fullLength, bufferTo.Len())
+ }
+}
--- /dev/null
+package rsstypes
+
+import (
+ "encoding/xml"
+)
+
+type Rss struct {
+ XMLName xml.Name `xml:"rss"`
+ Version string `xml:"version,attr"`
+ Channel Channel `xml:"channel"`
+ Source string `xml:"-"`
+}
+
+type Channel struct {
+ XMLName xml.Name `xml:"channel"`
+ Title string `xml:"title"`
+ Items []Item `xml:"item"`
+}
+
+type Item struct {
+ Content string `xml:",innerxml"`
+ Source Source `xml:"source,omitempty"`
+ PubDate string `xml:"pubDate,omitempty"`
+}
+
+type Source struct {
+ Title string `xml:",innerxml"`
+ Url string `xml:"url,attr"`
+}
--- /dev/null
+package rsstypes
+
+import (
+ "bytes"
+ "encoding/xml"
+ "fmt"
+ "testing"
+)
+
+func TestSource(t *testing.T) {
+ expectedSource := Source{"perdu", "perdu.com"}
+ xmlString := fmt.Sprintf(`<source url="%s">%s</source>`, expectedSource.Url, expectedSource.Title)
+ var source Source
+ err := xml.NewDecoder(bytes.NewBufferString(xmlString)).Decode(&source)
+ if err != nil {
+ t.Errorf("expected %v error, but got %v", nil, err)
+ }
+ if source.Title != expectedSource.Title {
+ t.Errorf("expected %v title but got %v", expectedSource.Title, source.Title)
+ }
+ if source.Url != expectedSource.Url {
+ t.Errorf("expected %v url but got %v", expectedSource.Url, source.Url)
+ }
+}