From 91c076b9ebdac0f22fa4ee6de63702abcad559df Mon Sep 17 00:00:00 2001 From: Fl_GUI Date: Sat, 28 Mar 2026 16:09:00 +0100 Subject: [PATCH 1/1] initial implementation --- cache/cache.go | 82 ++++++++++++++++++ cache/cachefactory.go | 35 ++++++++ combine/combine.go | 28 ++++++ go.mod | 3 + main.go | 57 ++++++++++++ pipe/pipe.go | 64 ++++++++++++++ rsscollector/collector.go | 153 +++++++++++++++++++++++++++++++++ rsscollector/collector_test.go | 28 ++++++ rsstypes/rss.go | 29 +++++++ rsstypes/source_test.go | 24 ++++++ 10 files changed, 503 insertions(+) create mode 100644 cache/cache.go create mode 100644 cache/cachefactory.go create mode 100644 combine/combine.go create mode 100644 go.mod create mode 100644 main.go create mode 100644 pipe/pipe.go create mode 100644 rsscollector/collector.go create mode 100644 rsscollector/collector_test.go create mode 100644 rsstypes/rss.go create mode 100644 rsstypes/source_test.go diff --git a/cache/cache.go b/cache/cache.go new file mode 100644 index 0000000..5384691 --- /dev/null +++ b/cache/cache.go @@ -0,0 +1,82 @@ +package cache + +import ( + "fmt" + "io" + "net/http" + "net/url" + "os" + "strings" + "time" +) + +type Cache struct { + Timeout time.Duration + rssUrl string + root *os.Root + fileName string +} + +func CreateCache(rssUrl string) (Cache, error) { + var result Cache + result.Timeout = time.Hour + result.rssUrl = rssUrl + result.root = cacheFactory.root + + parsedUrl, err := url.ParseRequestURI(rssUrl) + if err != nil { + return result, err + } + result.fileName = fmt.Sprintf("%s%s", parsedUrl.Hostname(), strings.ReplaceAll(parsedUrl.Path, "/", ".")) + if result.staleness() != nil { + return result, result.Fetch() + } + return result, nil +} + +func (c Cache) Fetch() error { + resp, err := http.Get(c.rssUrl) + if err != nil { + return err + } + defer resp.Body.Close() + f, err := c.root.Create(c.fileName) + if err != nil { + return err + } + _, err = io.Copy(f, resp.Body) + return err +} + +func (c Cache) Source() string { + return c.rssUrl +} + +func (c Cache) Open() (io.ReadCloser, error) { + err := c.staleness() + if err != nil { + fmt.Printf("fetching %s because of: %s\n", c.fileName, err) + err := c.Fetch() + if err != nil { + return nil, err + } + } + + // file exists + return c.root.Open(c.fileName) +} + +func (c Cache) staleness() error { + fileInfo, err := c.root.Stat(c.fileName) + + if err != nil { + return err + } + + timeSince := time.Since(fileInfo.ModTime()) + if c.Timeout < timeSince { + return fmt.Errorf("timeout of %s exceeded", c.Timeout.String()) + } + + return nil +} diff --git a/cache/cachefactory.go b/cache/cachefactory.go new file mode 100644 index 0000000..bfbfcd4 --- /dev/null +++ b/cache/cachefactory.go @@ -0,0 +1,35 @@ +package cache + +import ( + "errors" + "io/fs" + "os" + "path" +) + +var cacheFactory factory + +type factory struct { + root *os.Root +} + +func createFactory(root *os.Root) factory { + return factory{root} +} + +func init() { + cacheDir, err := os.UserCacheDir() + if err != nil { + panic(err) + } + cacheDir = path.Join(cacheDir, "aggred-cache") + err = os.Mkdir(cacheDir, fs.ModeDir|0740) + if err != nil && !errors.Is(err, fs.ErrExist) { + panic(err) + } + cacheRoot, err := os.OpenRoot(cacheDir) + if err != nil { + panic(err) + } + cacheFactory = createFactory(cacheRoot) +} diff --git a/combine/combine.go b/combine/combine.go new file mode 100644 index 0000000..0f36a7f --- /dev/null +++ b/combine/combine.go @@ -0,0 +1,28 @@ +package combine + +import "aggred/rsstypes" + +func Combine(template rsstypes.Rss, others ...rsstypes.Rss) rsstypes.Rss { + var items = template.Channel.Items + for _, feed := range others { + items = append(items, itemReferences(feed)...) + } + template.Channel.Items = items + return template +} + +func itemReferences(rss rsstypes.Rss) []rsstypes.Item { + title := rss.Channel.Title + source := rss.Source + if source == "" { + return rss.Channel.Items + } + for i := range rss.Channel.Items { + // edit in place + rss.Channel.Items[i].Source = rsstypes.Source{ + title, + source, + } + } + return rss.Channel.Items +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..d6dd448 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module aggred + +go 1.25.0 diff --git a/main.go b/main.go new file mode 100644 index 0000000..0def980 --- /dev/null +++ b/main.go @@ -0,0 +1,57 @@ +package main + +import ( + "aggred/pipe" + "aggred/rsscollector" + "aggred/rsstypes" + "bytes" + "context" + "encoding/xml" + "fmt" + "os" + "os/signal" +) + +const pipeName = "pipe.io" + +func template() rsstypes.Rss { + templateString := ` + + + Combined + + + ` + var template rsstypes.Rss + err := xml.NewDecoder(bytes.NewBufferString(templateString)).Decode(&template) + if err != nil { + panic(err) + } + return template +} + +func main() { + nytimes := "https://rss.nytimes.com/services/xml/rss/nyt/Europe.xml" + hackaday := "https://hackaday.com/blog/feed/" + reader, err := rsscollector.NewRssCollector(template(), nytimes, hackaday) + if err != nil { + panic(err) + } + ctx, cancelCtx := context.WithCancel(context.Background()) + defer cancelCtx() + + closePipe, err := pipe.DoPipe(ctx, reader, "combinedrss.namedpipe") + if err != nil { + panic(err) + } + defer func() { + fmt.Println("closing named pipe") + if err := closePipe(); err != nil { + fmt.Println(err) + } + }() + + signalCtx, _ := signal.NotifyContext(ctx, os.Interrupt) + <-signalCtx.Done() + fmt.Println("stopping after interrupt received") +} diff --git a/pipe/pipe.go b/pipe/pipe.go new file mode 100644 index 0000000..62c1e41 --- /dev/null +++ b/pipe/pipe.go @@ -0,0 +1,64 @@ +package pipe + +import ( + "context" + "errors" + "fmt" + "io" + "io/fs" + "os" + "syscall" + "time" +) + +func DoPipe(ctx context.Context, content io.ReadSeeker, pipePath string) (func() error, error) { + closer, err := ensurePipe(pipePath) + if err != nil { + return closer, fmt.Errorf("failed to create a fifo pipe file: %w", err) + } + go writeToPipe(ctx, content, pipePath) + return closer, nil +} + +func ensurePipe(pipeName string) (func() error, error) { + if err := os.Remove(pipeName); err != nil { + if !errors.Is(err, fs.ErrNotExist) { + return func() error { return err }, err + } + } + err := syscall.Mkfifo(pipeName, 0666) + if err != nil { + return func() error { return err }, err + } + return func() error { return os.Remove(pipeName) }, nil +} + +func writeToPipe(ctx context.Context, content io.ReadSeeker, pipeName string) { + fileFlags := os.O_WRONLY | os.O_APPEND + + for { + select { + case <-ctx.Done(): + return + default: + file, err := os.OpenFile(pipeName, fileFlags, fs.ModeNamedPipe) + if err != nil { + panic(err) + } + file.WriteString("") + bytes, err := io.Copy(file, content) + fmt.Printf("Written %v bytes\n", bytes) + if err != nil { + fmt.Println(err) + fmt.Println("seeking start of content") + content.Seek(0, io.SeekStart) + } + err = file.Close() + if err != nil { + fmt.Println(err) + } + // allow time for reader of file to close the handle + <-time.After(time.Millisecond) + } + } +} diff --git a/rsscollector/collector.go b/rsscollector/collector.go new file mode 100644 index 0000000..2474e19 --- /dev/null +++ b/rsscollector/collector.go @@ -0,0 +1,153 @@ +package rsscollector + +import ( + "aggred/cache" + "aggred/combine" + "aggred/rsstypes" + "bytes" + "encoding/xml" + "errors" + "fmt" + "io" + "slices" + "sync" + "sync/atomic" + "time" +) + +type RssCollector struct { + template rsstypes.Rss + sources []cache.Cache + buff bytes.Buffer +} + +func NewRssCollector(template rsstypes.Rss, rsssources ...string) (io.ReadSeeker, error) { + rw := new(RssCollector) + rw.template = template + + rw.sources = make([]cache.Cache, 0, len(rsssources)) + errs := make([]error, 0) + for _, source := range rsssources { + c, err := cache.CreateCache(source) + if err != nil { + errs = append(errs, fmt.Errorf("could not create a source for %v: %w", source, err)) + } else { + rw.sources = append(rw.sources, c) + } + } + + return rw, errors.Join(errs...) +} + +func (r *RssCollector) Read(p []byte) (n int, err error) { + err = r.ensureContent() + if err != nil { + return 0, err + } + return r.buff.Read(p) +} + +func (r *RssCollector) WriteTo(w io.Writer) (n int64, err error) { + err = r.ensureContent() + if err != nil { + return 0, err + } + return r.buff.WriteTo(w) +} + +func (r *RssCollector) Seek(offset int64, whence int) (int64, error) { + if whence != io.SeekStart { + return 0, errors.New("seeking is only supported from io.SeekStart") + } + if offset != 0 { + return 0, errors.New("seeking is only supported with 0 offset") + } + r.buff.Reset() + return 0, nil +} + +func (r *RssCollector) ensureContent() error { + if r.buff.Len() != 0 { + // some content left + return nil + } + + feeds := make([]rsstypes.Rss, len(r.sources)) + errs := make([]error, len(r.sources)) + errCount := atomic.Int32{} + + wg := sync.WaitGroup{} + workers := make(chan struct{}, 5) + for i := range r.sources { + wg.Add(1) + go func() { + defer wg.Done() + workers <- struct{}{} + defer func() { <-workers }() + + source := r.sources[i] + reader, err := source.Open() + if err != nil { + errCount.Add(1) + errs[i] = err + return + } + decoder := xml.NewDecoder(reader) + + err = decoder.Decode(&feeds[i]) + if err != nil { + errCount.Add(1) + errs[i] = err + return + } + + feeds[i].Source = r.sources[i].Source() + }() + } + + wg.Wait() + + if errCount.Load() == int32(len(errs)) { + noneLoadedErr := errors.New("could not load a single source") + return fmt.Errorf("%w: %w", noneLoadedErr, errors.Join(errs...)) + } else if errCount.Load() != 0 { + someFailedErr := errors.New("could not load some sources") + err := fmt.Errorf("%w: %w", someFailedErr, errors.Join(errs...)) + fmt.Println(err) + } + + r.buff.Reset() + result := combine.Combine(r.template, feeds...) + + slices.SortFunc(result.Channel.Items, compareItems) + + for i := range result.Channel.Items { + clearExtra(&result.Channel.Items[i]) + } + return xml.NewEncoder(&r.buff).Encode(result) +} + +func compareItems(a, b rsstypes.Item) int { + var aTime, bTime time.Time + var err error + defaultTime := time.Now().Add(-time.Hour * 24) + aPub := a.PubDate + bPub := b.PubDate + + aTime, err = time.Parse(time.RFC1123Z, aPub) + if err != nil { + aTime = defaultTime + } + + bTime, err = time.Parse(time.RFC1123Z, bPub) + if err != nil { + bTime = defaultTime + } + + // negative to put newest first + return -aTime.Compare(bTime) +} + +func clearExtra(item *rsstypes.Item) { + item.PubDate = "" +} diff --git a/rsscollector/collector_test.go b/rsscollector/collector_test.go new file mode 100644 index 0000000..418d771 --- /dev/null +++ b/rsscollector/collector_test.go @@ -0,0 +1,28 @@ +package rsscollector + +import ( + "bytes" + "io" + "testing" +) + +var _ io.ReadSeeker = (*RssCollector)(nil) +var _ io.WriterTo = (*RssCollector)(nil) + +func TestBufferLenZeroAfterFullWrite(t *testing.T) { + bufferFrom := bytes.NewBufferString("foobarspam") + var bufferTo bytes.Buffer + fullLength := bufferFrom.Len() + + bytes, err := io.Copy(&bufferTo, bufferFrom) + if err != nil { + t.Fatalf("expected to copy without %v error but got %v", nil, err) + } + if bytes != int64(fullLength) { + t.Errorf("expected to write the full %v length of buffer but only wrote %v", fullLength, bytes) + } + + if bufferTo.Len() != fullLength { + t.Errorf("expected receiving buffer to have %v unread bytes but got %v", fullLength, bufferTo.Len()) + } +} diff --git a/rsstypes/rss.go b/rsstypes/rss.go new file mode 100644 index 0000000..5f70ddc --- /dev/null +++ b/rsstypes/rss.go @@ -0,0 +1,29 @@ +package rsstypes + +import ( + "encoding/xml" +) + +type Rss struct { + XMLName xml.Name `xml:"rss"` + Version string `xml:"version,attr"` + Channel Channel `xml:"channel"` + Source string `xml:"-"` +} + +type Channel struct { + XMLName xml.Name `xml:"channel"` + Title string `xml:"title"` + Items []Item `xml:"item"` +} + +type Item struct { + Content string `xml:",innerxml"` + Source Source `xml:"source,omitempty"` + PubDate string `xml:"pubDate,omitempty"` +} + +type Source struct { + Title string `xml:",innerxml"` + Url string `xml:"url,attr"` +} diff --git a/rsstypes/source_test.go b/rsstypes/source_test.go new file mode 100644 index 0000000..a3dfd82 --- /dev/null +++ b/rsstypes/source_test.go @@ -0,0 +1,24 @@ +package rsstypes + +import ( + "bytes" + "encoding/xml" + "fmt" + "testing" +) + +func TestSource(t *testing.T) { + expectedSource := Source{"perdu", "perdu.com"} + xmlString := fmt.Sprintf(`%s`, expectedSource.Url, expectedSource.Title) + var source Source + err := xml.NewDecoder(bytes.NewBufferString(xmlString)).Decode(&source) + if err != nil { + t.Errorf("expected %v error, but got %v", nil, err) + } + if source.Title != expectedSource.Title { + t.Errorf("expected %v title but got %v", expectedSource.Title, source.Title) + } + if source.Url != expectedSource.Url { + t.Errorf("expected %v url but got %v", expectedSource.Url, source.Url) + } +} -- 2.49.0